This is an automated email from the ASF dual-hosted git repository.

quinn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new c8f24ca  CAMEL-12210 - Improve detection of the end of an MLLP Envelope
c8f24ca is described below

commit c8f24cac46f9c4a02a869597a0e7ba03565aa9fd
Author: Quinn Stevenson <qu...@apache.org>
AuthorDate: Mon Jan 29 15:58:00 2018 -0700

    CAMEL-12210 - Improve detection of the end of an MLLP Envelope
---
 .../component/mllp/MllpTcpServerConsumer.java      |  55 ++++--
 .../component/mllp/internal/MllpSocketBuffer.java  |  35 +++-
 ...eptRunnable.java => TcpServerAcceptThread.java} |  24 ++-
 .../TcpServerConsumerValidationRunnable.java       | 196 +++++++++++++++++++++
 .../mllp/internal/TcpSocketConsumerRunnable.java   |  45 +++--
 ...TcpClientProducerIdleConnectionTimeoutTest.java |   8 +-
 .../mllp/MllpTcpServerConsumerConnectionTest.java  |   5 +-
 ...onsumerOptionalEndOfDataWithValidationTest.java |  13 +-
 ...umerOptionalEndOfDataWithoutValidationTest.java |  14 +-
 ...onsumerRequiredEndOfDataWithValidationTest.java |  15 +-
 ...umerRequiredEndOfDataWithoutValidationTest.java |  12 +-
 ...tProducerEndOfDataAndValidationTestSupport.java |   6 +-
 ...rConsumerEndOfDataAndValidationTestSupport.java |  63 ++++---
 .../mllp/internal/MllpSocketBufferTest.java        |   4 +-
 .../mllp/internal/MllpSocketBufferWriteTest.java   |   4 +-
 .../src/test/resources/log4j2.properties           |   2 +-
 16 files changed, 412 insertions(+), 89 deletions(-)

diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index bc4f41d..72b00bd 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -36,29 +37,27 @@ import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.component.mllp.internal.MllpSocketBuffer;
-import org.apache.camel.component.mllp.internal.TcpServerAcceptRunnable;
+import 
org.apache.camel.component.mllp.internal.TcpServerConsumerValidationRunnable;
+import org.apache.camel.component.mllp.internal.TcpServerAcceptThread;
 import org.apache.camel.component.mllp.internal.TcpSocketConsumerRunnable;
 import org.apache.camel.impl.DefaultConsumer;
-import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerator;
 
 /**
  * The MLLP consumer.
  */
 @ManagedResource(description = "MLLP Producer")
 public class MllpTcpServerConsumer extends DefaultConsumer {
-    final ExecutorService acceptExecutor;
+    final ExecutorService validationExecutor;
     final ExecutorService consumerExecutor;
-    TcpServerAcceptRunnable acceptRunnable;
+    TcpServerAcceptThread acceptThread;
     Map<TcpSocketConsumerRunnable, Long> consumerRunnables = new 
ConcurrentHashMap<>();
 
 
     public MllpTcpServerConsumer(MllpEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         log.trace("MllpTcpServerConsumer(endpoint, processor)");
-        // this.endpoint = endpoint;
-        // this.configuration = endpoint.getConfiguration();
 
-        acceptExecutor = new ThreadPoolExecutor(1, 1, 0L, 
TimeUnit.MILLISECONDS, new SynchronousQueue<>());
+        validationExecutor = Executors.newCachedThreadPool();
         consumerExecutor = new ThreadPoolExecutor(1, 
getConfiguration().getMaxConcurrentConsumers(), 
getConfiguration().getAcceptTimeout(), TimeUnit.MILLISECONDS, new 
SynchronousQueue<>());
     }
 
@@ -111,9 +110,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
             consumerClientSocketThread.stop();
         }
 
-        acceptRunnable.stop();
+        acceptThread.interrupt();
 
-        acceptRunnable = null;
+        acceptThread = null;
 
         super.doStop();
     }
@@ -171,8 +170,10 @@ public class MllpTcpServerConsumer extends DefaultConsumer 
{
             }
         } while (!serverSocket.isBound());
 
-        acceptRunnable = new TcpServerAcceptRunnable(this, serverSocket);
-        acceptExecutor.submit(acceptRunnable);
+        // acceptRunnable = new TcpServerConsumerValidationRunnable(this, 
serverSocket);
+        // validationExecutor.submit(acceptRunnable);
+        acceptThread = new TcpServerAcceptThread(this, serverSocket);
+        acceptThread.start();
 
         super.doStart();
     }
@@ -181,7 +182,10 @@ public class MllpTcpServerConsumer extends DefaultConsumer 
{
     protected void doShutdown() throws Exception {
         super.doShutdown();
         consumerExecutor.shutdownNow();
-        acceptExecutor.shutdownNow();
+        if (acceptThread != null) {
+            acceptThread.interrupt();
+        }
+        validationExecutor.shutdownNow();
     }
 
     public MllpConfiguration getConfiguration() {
@@ -192,8 +196,21 @@ public class MllpTcpServerConsumer extends DefaultConsumer 
{
         return consumerRunnables;
     }
 
-    public void startConsumer(Socket clientSocket) {
-        TcpSocketConsumerRunnable client = new TcpSocketConsumerRunnable(this, 
clientSocket);
+    public void validateConsumer(Socket clientSocket) {
+        MllpSocketBuffer mllpBuffer = new MllpSocketBuffer(getEndpoint());
+        TcpServerConsumerValidationRunnable client = new 
TcpServerConsumerValidationRunnable(this, clientSocket, mllpBuffer);
+
+        try {
+            log.info("Validating consumer for Socket {}", clientSocket);
+            validationExecutor.submit(client);
+        } catch (RejectedExecutionException rejectedExecutionEx) {
+            log.warn("Cannot validate consumer - max validations already 
active");
+            mllpBuffer.resetSocket(clientSocket);
+        }
+    }
+
+    public void startConsumer(Socket clientSocket, MllpSocketBuffer 
mllpBuffer) {
+        TcpSocketConsumerRunnable client = new TcpSocketConsumerRunnable(this, 
clientSocket, mllpBuffer);
 
         consumerRunnables.put(client, System.currentTimeMillis());
         try {
@@ -205,5 +222,15 @@ public class MllpTcpServerConsumer extends DefaultConsumer 
{
         }
     }
 
+
+    @Override
+    public void handleException(Throwable t) {
+        super.handleException(t);
+    }
+
+    @Override
+    public void handleException(String message, Throwable t) {
+        super.handleException(message, t);
+    }
 }
 
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
index 8839045..faf6d63 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
@@ -170,8 +170,11 @@ public class MllpSocketBuffer {
         endOfBlockIndex = -1;
     }
 
-
     public synchronized void readFrom(Socket socket) throws 
MllpSocketException, SocketTimeoutException {
+        readFrom(socket, endpoint.getConfiguration().getReceiveTimeout(), 
endpoint.getConfiguration().getReadTimeout());
+    }
+
+    public synchronized void readFrom(Socket socket, int receiveTimeout, int 
readTimeout) throws MllpSocketException, SocketTimeoutException {
         log.trace("Entering readFrom ...");
         if (socket != null && socket.isConnected() && !socket.isClosed()) {
             ensureCapacity(MIN_BUFFER_SIZE);
@@ -179,11 +182,11 @@ public class MllpSocketBuffer {
             try {
                 InputStream socketInputStream = socket.getInputStream();
 
-                
socket.setSoTimeout(endpoint.getConfiguration().getReceiveTimeout());
+                socket.setSoTimeout(receiveTimeout);
 
                 readSocketInputStream(socketInputStream, socket);
                 if (!hasCompleteEnvelope()) {
-                    
socket.setSoTimeout(endpoint.getConfiguration().getReadTimeout());
+                    socket.setSoTimeout(readTimeout);
 
                     while (!hasCompleteEnvelope()) {
                         ensureCapacity(Math.max(MIN_BUFFER_SIZE, 
socketInputStream.available()));
@@ -249,7 +252,7 @@ public class MllpSocketBuffer {
         log.trace("Exiting writeTo ...");
     }
 
-    public synchronized byte toByteArray()[] {
+    public synchronized byte[] toByteArray() {
         if (availableByteCount > 0) {
             return Arrays.copyOf(buffer, availableByteCount);
         }
@@ -257,7 +260,7 @@ public class MllpSocketBuffer {
         return null;
     }
 
-    public synchronized byte toByteArrayAndReset()[] {
+    public synchronized byte[] toByteArrayAndReset() {
         byte[] answer = toByteArray();
 
         reset();
@@ -293,6 +296,22 @@ public class MllpSocketBuffer {
         return "";
     }
 
+    public synchronized String toStringAndReset() {
+        String answer = toString();
+
+        reset();
+
+        return answer;
+    }
+
+    public synchronized String toStringAndReset(String charsetName) {
+        String answer = toString(charsetName);
+
+        reset();
+
+        return answer;
+    }
+
     /**
      * Convert the entire contents of the buffer (including enveloping 
characters) to a print-friendly
      * String representation.
@@ -534,8 +553,10 @@ public class MllpSocketBuffer {
     }
 
     void updateIndexes(int b, int indexOffset) {
-        if (startOfBlockIndex < 0 && b == 
MllpProtocolConstants.START_OF_BLOCK) {
-            startOfBlockIndex = availableByteCount + indexOffset;
+        if (startOfBlockIndex < 0) {
+            if (b == MllpProtocolConstants.START_OF_BLOCK) {
+                startOfBlockIndex = availableByteCount + indexOffset;
+            }
         } else if (endOfBlockIndex < 0 && b == 
MllpProtocolConstants.END_OF_BLOCK) {
             endOfBlockIndex = availableByteCount + indexOffset;
         }
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptRunnable.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptThread.java
similarity index 88%
rename from 
components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptRunnable.java
rename to 
components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptThread.java
index 568a56a..f126286 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptRunnable.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptThread.java
@@ -31,16 +31,16 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
 /**
- * Runnable to handle the ServerSocket.accept requests
+ * Thread to handle the ServerSocket.accept requests, and submit the sockets 
to the accept executor for validation.
  */
-public class TcpServerAcceptRunnable implements Runnable {
+public class TcpServerAcceptThread extends Thread {
     Logger log = LoggerFactory.getLogger(this.getClass());
 
     MllpTcpServerConsumer consumer;
     ServerSocket serverSocket;
     boolean running;
 
-    public TcpServerAcceptRunnable(MllpTcpServerConsumer consumer, 
ServerSocket serverSocket) {
+    public TcpServerAcceptThread(MllpTcpServerConsumer consumer, ServerSocket 
serverSocket) {
         this.consumer = consumer;
         this.serverSocket = serverSocket;
     }
@@ -97,7 +97,6 @@ public class TcpServerAcceptRunnable implements Runnable {
                 Socket socket = null;
                 try {
                     socket = serverSocket.accept();
-                    
consumer.getEndpoint().updateLastConnectionEstablishedTicks();
                 } catch (SocketTimeoutException timeoutEx) {
                     // Didn't get a new connection - keep waiting for one
                     log.debug("Timeout waiting for client connection - keep 
listening");
@@ -126,7 +125,7 @@ public class TcpServerAcceptRunnable implements Runnable {
 
                 if (MllpSocketBuffer.isConnectionValid(socket)) {
                     // Try and avoid starting client threads for things like 
security scans and load balancer probes
-                    consumer.startConsumer(socket);
+                    consumer.validateConsumer(socket);
                 }
             }
         } finally {
@@ -144,7 +143,18 @@ public class TcpServerAcceptRunnable implements Runnable {
         }
     }
 
-    public void stop() {
-        running = false;
+    @Override
+    public void interrupt() {
+        this.running = false;
+        super.interrupt();
+        if (null != serverSocket) {
+            if (serverSocket.isBound()) {
+                try {
+                    serverSocket.close();
+                } catch (IOException ioEx) {
+                    log.warn("Exception encountered closing ServerSocket in 
interrupt() method - ignoring", ioEx);
+                }
+            }
+        }
     }
 }
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerConsumerValidationRunnable.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerConsumerValidationRunnable.java
new file mode 100644
index 0000000..066b3db
--- /dev/null
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerConsumerValidationRunnable.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.mllp.internal;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+
+import org.apache.camel.Route;
+import org.apache.camel.component.mllp.MllpSocketException;
+import org.apache.camel.component.mllp.MllpTcpServerConsumer;
+import org.apache.camel.impl.MDCUnitOfWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Runnable to handle the ServerSocket.accept requests
+ */
+public class TcpServerConsumerValidationRunnable implements Runnable {
+    final Socket clientSocket;
+    final MllpSocketBuffer mllpBuffer;
+
+    Logger log = LoggerFactory.getLogger(this.getClass());
+    MllpTcpServerConsumer consumer;
+
+    private final String localAddress;
+    private final String remoteAddress;
+    private final String combinedAddress;
+
+    public TcpServerConsumerValidationRunnable(MllpTcpServerConsumer consumer, 
Socket clientSocket, MllpSocketBuffer mllpBuffer) {
+        this.consumer = consumer;
+        // this.setName(createThreadName(clientSocket));
+        this.clientSocket = clientSocket;
+
+        SocketAddress localSocketAddress = 
clientSocket.getLocalSocketAddress();
+        if (localSocketAddress != null) {
+            localAddress = localSocketAddress.toString();
+        } else {
+            localAddress = null;
+        }
+
+        SocketAddress remoteSocketAddress = 
clientSocket.getRemoteSocketAddress();
+        if (remoteSocketAddress != null) {
+            remoteAddress = remoteSocketAddress.toString();
+        } else {
+            remoteAddress = null;
+        }
+
+        combinedAddress = 
MllpSocketBuffer.formatAddressString(remoteSocketAddress, localSocketAddress);
+
+
+        try {
+            if (consumer.getConfiguration().hasKeepAlive()) {
+                
this.clientSocket.setKeepAlive(consumer.getConfiguration().getKeepAlive());
+            }
+            if (consumer.getConfiguration().hasTcpNoDelay()) {
+                
this.clientSocket.setTcpNoDelay(consumer.getConfiguration().getTcpNoDelay());
+            }
+            if (consumer.getConfiguration().hasReceiveBufferSize()) {
+                
this.clientSocket.setReceiveBufferSize(consumer.getConfiguration().getReceiveBufferSize());
+            }
+            if (consumer.getConfiguration().hasSendBufferSize()) {
+                
this.clientSocket.setSendBufferSize(consumer.getConfiguration().getSendBufferSize());
+            }
+
+            this.clientSocket.setSoLinger(false, -1);
+
+            // Initial Read Timeout
+            
this.clientSocket.setSoTimeout(consumer.getConfiguration().getReceiveTimeout());
+        } catch (IOException initializationException) {
+            throw new IllegalStateException("Failed to initialize " + 
this.getClass().getSimpleName(), initializationException);
+        }
+
+        if (mllpBuffer == null) {
+            this.mllpBuffer = new MllpSocketBuffer(consumer.getEndpoint());
+        } else {
+            this.mllpBuffer = mllpBuffer;
+        }
+    }
+
+    /**
+     * derive a thread name from the class name, the component URI and the 
connection information
+     * <p/>
+     * The String will in the format <class name>[endpoint key] - [local 
socket address] -> [remote socket address]
+     *
+     * @return the thread name
+     */
+    String createThreadName(Socket socket) {
+        // Get the URI without options
+        String fullEndpointKey = consumer.getEndpoint().getEndpointKey();
+        String endpointKey;
+        if (fullEndpointKey.contains("?")) {
+            endpointKey = fullEndpointKey.substring(0, 
fullEndpointKey.indexOf('?'));
+        } else {
+            endpointKey = fullEndpointKey;
+        }
+
+        // Now put it all together
+        return String.format("%s[%s] - %s", this.getClass().getSimpleName(), 
endpointKey, combinedAddress);
+    }
+
+    /**
+     * Do the initial read on the Socket and try to determine if it has HL7 
data, junk, or nothing.
+     */
+    @Override
+    public void run() {
+        String originalThreadName = Thread.currentThread().getName();
+        Thread.currentThread().setName(createThreadName(clientSocket));
+        MDC.put(MDCUnitOfWork.MDC_CAMEL_CONTEXT_ID, 
consumer.getEndpoint().getCamelContext().getName());
+
+        Route route = consumer.getRoute();
+        if (route != null) {
+            String routeId = route.getId();
+            if (routeId != null) {
+                MDC.put(MDCUnitOfWork.MDC_ROUTE_ID, route.getId());
+            }
+        }
+
+        log.debug("Checking {} for data", combinedAddress);
+
+        try {
+            mllpBuffer.readFrom(clientSocket, 500, 50);
+            if (mllpBuffer.hasCompleteEnvelope()  || 
mllpBuffer.hasStartOfBlock()) {
+                consumer.startConsumer(clientSocket, mllpBuffer);
+            } else if (!mllpBuffer.isEmpty()) {
+                // We have some leading out-of-band data but no START_OF_BLOCK
+                log.info("Ignoring out-of-band data on initial read: {}", 
mllpBuffer.toStringAndReset());
+                mllpBuffer.resetSocket(clientSocket);
+            }
+        } catch (MllpSocketException socketEx) {
+            // TODO:  The socket is invalid for some reason
+            if (!mllpBuffer.isEmpty()) {
+                log.warn("Exception encountered receiving complete message: ", 
mllpBuffer.toStringAndReset());
+            }
+            mllpBuffer.resetSocket(clientSocket);
+        } catch (SocketTimeoutException timeoutEx) {
+            if (mllpBuffer.isEmpty()) {
+                log.debug("Initial read timed-out but no data was read - 
starting consumer");
+                consumer.startConsumer(clientSocket, mllpBuffer);
+            } else {
+                log.warn("Timeout receiving complete message: {}", 
mllpBuffer.toStringAndReset());
+                mllpBuffer.resetSocket(clientSocket);
+            }
+        } finally {
+            Thread.currentThread().setName(originalThreadName);
+        }
+    }
+
+    public void closeSocket() {
+        mllpBuffer.closeSocket(clientSocket);
+    }
+
+    public void closeSocket(String logMessage) {
+        mllpBuffer.closeSocket(clientSocket, logMessage);
+    }
+
+    public void resetSocket() {
+        mllpBuffer.resetSocket(clientSocket);
+    }
+
+    public void resetSocket(String logMessage) {
+        mllpBuffer.resetSocket(clientSocket, logMessage);
+    }
+
+    public String getLocalAddress() {
+        return localAddress;
+    }
+
+    public String getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    public String getCombinedAddress() {
+        return combinedAddress;
+    }
+
+}
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java
index 0f0aa56..8940867 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java
@@ -61,7 +61,7 @@ public class TcpSocketConsumerRunnable implements Runnable {
     private final String remoteAddress;
     private final String combinedAddress;
 
-    public TcpSocketConsumerRunnable(MllpTcpServerConsumer consumer, Socket 
clientSocket) {
+    public TcpSocketConsumerRunnable(MllpTcpServerConsumer consumer, Socket 
clientSocket, MllpSocketBuffer mllpBuffer) {
         this.consumer = consumer;
         // this.setName(createThreadName(clientSocket));
         this.clientSocket = clientSocket;
@@ -105,7 +105,11 @@ public class TcpSocketConsumerRunnable implements Runnable 
{
             throw new IllegalStateException("Failed to initialize " + 
this.getClass().getSimpleName(), initializationException);
         }
 
-        mllpBuffer = new MllpSocketBuffer(consumer.getEndpoint());
+        if (mllpBuffer == null) {
+            this.mllpBuffer = new MllpSocketBuffer(consumer.getEndpoint());
+        } else {
+            this.mllpBuffer = mllpBuffer;
+        }
     }
 
     /**
@@ -448,6 +452,7 @@ public class TcpSocketConsumerRunnable implements Runnable {
                 throw runtimeEx;
             } catch (Exception ex) {
                 log.error("Unexpected exception processing exchange", ex);
+                exchange.setException(ex);
             }
         } catch (Exception uowEx) {
             // TODO:  Handle this correctly
@@ -483,8 +488,15 @@ public class TcpSocketConsumerRunnable implements Runnable 
{
 
         log.debug("Starting {} for {}", this.getClass().getSimpleName(), 
combinedAddress);
         try {
+            byte[] hl7MessageBytes = null;
+            if (mllpBuffer.hasCompleteEnvelope()) {
+                // If we got a complete message on the validation read, 
process it
+                hl7MessageBytes = mllpBuffer.toMllpPayload();
+                mllpBuffer.reset();
+                processMessage(hl7MessageBytes);
+            }
+
             while (running && null != clientSocket && 
clientSocket.isConnected() && !clientSocket.isClosed()) {
-                byte[] hl7MessageBytes = null;
                 log.debug("Checking for data ....");
                 try {
                     mllpBuffer.readFrom(clientSocket);
@@ -519,31 +531,16 @@ public class TcpSocketConsumerRunnable implements 
Runnable {
                                 
consumer.getEndpoint().doConnectionClose(clientSocket, true, log);
                             }
                         }
-                        log.info("No data received - ignoring timeout");
+                        log.debug("No data received - ignoring timeout");
                     } else {
                         mllpBuffer.resetSocket(clientSocket);
-                        if (consumer.getEndpoint().isBridgeErrorHandler()) {
-                            Exchange exchange = 
consumer.getEndpoint().createExchange(ExchangePattern.InOut);
-                            exchange.setException(new 
MllpInvalidMessageException("Timeout receiving complete payload", 
mllpBuffer.toByteArray()));
-                            log.warn("Exception encountered reading payload - 
sending exception to route", exchange.getException());
-                            try {
-                                consumer.getProcessor().process(exchange);
-                            } catch (Exception e) {
-                                log.error("Exception encountered processing 
exchange with exception encounter reading payload", e);
-                            }
-                        } else {
-                            log.error("Timeout receiving complete payload", 
new MllpInvalidMessageException("Timeout receiving complete payload", 
mllpBuffer.toByteArray(), timeoutEx));
-                        }
+                        new MllpInvalidMessageException("Timeout receiving 
complete message payload", mllpBuffer.toByteArrayAndReset(), timeoutEx);
+                        consumer.handleException(new 
MllpInvalidMessageException("Timeout receiving complete message payload", 
mllpBuffer.toByteArrayAndReset(), timeoutEx));
                     }
                 } catch (MllpSocketException mllpSocketEx) {
+                    mllpBuffer.resetSocket(clientSocket);
                     if (!mllpBuffer.isEmpty()) {
-                        Exchange exchange = 
consumer.getEndpoint().createExchange(ExchangePattern.InOut);
-                        exchange.setException(new 
MllpReceiveException("Exception encountered reading payload", 
mllpBuffer.toByteArrayAndReset(), mllpSocketEx));
-                        try {
-                            consumer.getProcessor().process(exchange);
-                        } catch (Exception ignoredEx) {
-                            log.error("Ingnoring exception encountered 
processing exchange with exception encounter reading payload", ignoredEx);
-                        }
+                        consumer.handleException(new 
MllpReceiveException("Exception encountered reading payload", 
mllpBuffer.toByteArrayAndReset(), mllpSocketEx));
                     } else {
                         log.warn("Ignoring exception encountered checking for 
data", mllpSocketEx);
                     }
@@ -558,6 +555,8 @@ public class TcpSocketConsumerRunnable implements Runnable {
             Thread.currentThread().setName(originalThreadName);
             MDC.remove(MDCUnitOfWork.MDC_ROUTE_ID);
             MDC.remove(MDCUnitOfWork.MDC_CAMEL_CONTEXT_ID);
+
+            mllpBuffer.resetSocket(clientSocket);
         }
     }
 
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java
index 9121136..0539e2e 100644
--- 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java
@@ -42,7 +42,10 @@ import org.junit.Rule;
 import org.junit.Test;
 
 public class MllpTcpClientProducerIdleConnectionTimeoutTest extends 
CamelTestSupport {
-    static final int IDLE_TIMEOUT = 10000;
+    static final int CONNECT_TIMEOUT = 500;
+    static final int RECEIVE_TIMEOUT = 1000;
+    static final int READ_TIMEOUT = 500;
+    static final int IDLE_TIMEOUT = RECEIVE_TIMEOUT * 3;
 
     @Rule
     public MllpServerResource mllpServer = new MllpServerResource("localhost", 
AvailablePortFinder.getNextAvailable());
@@ -90,7 +93,8 @@ public class MllpTcpClientProducerIdleConnectionTimeoutTest 
extends CamelTestSup
 
                 from(source.getDefaultEndpoint()).routeId(routeId)
                     .log(LoggingLevel.INFO, routeId, "Sending Message")
-                    .toF("mllp://%s:%d?idleTimeout=%s", 
mllpServer.getListenHost(), mllpServer.getListenPort(), IDLE_TIMEOUT)
+                    
.toF("mllp://%s:%d?connectTimeout=%d&receiveTimeout=%d&readTimeout=%d&idleTimeout=%s",
 mllpServer.getListenHost(), mllpServer.getListenPort(),
+                        CONNECT_TIMEOUT, RECEIVE_TIMEOUT, READ_TIMEOUT, 
IDLE_TIMEOUT)
                     .log(LoggingLevel.INFO, routeId, "Received 
Acknowledgement")
                     .to(complete);
             }
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
index b7b4d99..c366465 100644
--- 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
@@ -40,7 +40,8 @@ import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 
 public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport {
-    static final int RECEIVE_TIMEOUT = 500;
+    static final int RECEIVE_TIMEOUT = 1000;
+    static final int READ_TIMEOUT = 500;
 
     @Rule
     public MllpClientResource mllpClient = new MllpClientResource();
@@ -68,7 +69,7 @@ public class MllpTcpServerConsumerConnectionTest extends 
CamelTestSupport {
             String routeId = "mllp-receiver";
 
             public void configure() {
-                fromF("mllp://%s:%d?autoAck=false", mllpClient.getMllpHost(), 
mllpClient.getMllpPort())
+                
fromF("mllp://%s:%d?receiveTimeout=%d&readTimeout=%d&autoAck=false", 
mllpClient.getMllpHost(), mllpClient.getMllpPort(), RECEIVE_TIMEOUT, 
READ_TIMEOUT)
                     .log(LoggingLevel.INFO, routeId, "Receiving: ${body}")
                     .to(result);
             }
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithValidationTest.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithValidationTest.java
index 3645314..62140591 100644
--- 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithValidationTest.java
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithValidationTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.camel.component.mllp;
 
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.test.mllp.Hl7TestMessageGenerator;
+
 public class MllpTcpServerConsumerOptionalEndOfDataWithValidationTest extends 
TcpServerConsumerEndOfDataAndValidationTestSupport {
 
     @Override
@@ -62,7 +67,13 @@ public class 
MllpTcpServerConsumerOptionalEndOfDataWithValidationTest extends Tc
     public void testMessageContainingEmbeddedEndOfBlock() throws Exception {
         expectedInvalidCount = 1;
 
-        runMessageContainingEmbeddedEndOfBlock();
+        setExpectedCounts();
+
+        NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create();
+
+        
mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN",
 "EVN" + MllpProtocolConstants.END_OF_BLOCK));
+
+        assertTrue("Exchange should have completed", done.matches(5, 
TimeUnit.SECONDS));
     }
 
     @Override
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java
index 66087da..cc1c7d4 100644
--- 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.camel.component.mllp;
 
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.component.mllp.internal.Hl7Util;
 
 import org.apache.camel.test.mllp.Hl7TestMessageGenerator;
@@ -44,6 +47,8 @@ public class 
MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest extends
 
     @Override
     public void testNthInvalidMessage() throws Exception {
+        expectedFailedCount = 1;
+
         runNthInvalidMessage();
     }
 
@@ -65,8 +70,13 @@ public class 
MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest extends
     public void testMessageContainingEmbeddedEndOfBlock() throws Exception {
         expectedCompleteCount = 1;
 
-        runMessageContainingEmbeddedEndOfBlock();
-    }
+        setExpectedCounts();
+
+        NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create();
+
+        
mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN",
 "EVN" + MllpProtocolConstants.END_OF_BLOCK));
+
+        assertTrue("Exchange should have completed", done.matches(5, 
TimeUnit.SECONDS));    }
 
     @Override
     public void testNthMessageContainingEmbeddedEndOfBlock() throws Exception {
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithValidationTest.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithValidationTest.java
index d8bee66..bb7666f 100644
--- 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithValidationTest.java
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithValidationTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.camel.component.mllp;
 
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.test.mllp.Hl7TestMessageGenerator;
+
 public class MllpTcpServerConsumerRequiredEndOfDataWithValidationTest extends 
TcpServerConsumerEndOfDataAndValidationTestSupport {
 
     @Override
@@ -59,9 +64,15 @@ public class 
MllpTcpServerConsumerRequiredEndOfDataWithValidationTest extends Tc
 
     @Override
     public void testMessageContainingEmbeddedEndOfBlock() throws Exception {
-        expectedInvalidCount = 1;
+        //expectedInvalidCount = 1;
+
+        setExpectedCounts();
+
+        NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create();
+
+        
mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN",
 "EVN" + MllpProtocolConstants.END_OF_BLOCK));
 
-        runMessageContainingEmbeddedEndOfBlock();
+        assertFalse("Exchange should not have completed", done.matches(5, 
TimeUnit.SECONDS));
     }
 
     @Override
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java
index 458a7bf..4abe439 100644
--- 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java
@@ -37,11 +37,15 @@ public class 
MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest extends
 
     @Override
     public void testInvalidMessage() throws Exception {
+        expectedFailedCount = 1;
+
         runNthInvalidMessage();
     }
 
     @Override
     public void testNthInvalidMessage() throws Exception {
+        expectedFailedCount = 1;
+
         runNthInvalidMessage();
     }
 
@@ -61,9 +65,13 @@ public class 
MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest extends
 
     @Override
     public void testMessageContainingEmbeddedEndOfBlock() throws Exception {
-        expectedInvalidCount = 1;
+        setExpectedCounts();
+
+        NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create();
+
+        
mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN",
 "EVN" + MllpProtocolConstants.END_OF_BLOCK));
 
-        runMessageContainingEmbeddedEndOfBlock();
+        assertFalse("Exchange should not have completed", done.matches(5, 
TimeUnit.SECONDS));
     }
 
     @Override
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpClientProducerEndOfDataAndValidationTestSupport.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpClientProducerEndOfDataAndValidationTestSupport.java
index f659685..4893f7e 100644
--- 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpClientProducerEndOfDataAndValidationTestSupport.java
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpClientProducerEndOfDataAndValidationTestSupport.java
@@ -35,6 +35,9 @@ import org.junit.Rule;
 import org.junit.Test;
 
 public abstract class TcpClientProducerEndOfDataAndValidationTestSupport 
extends CamelTestSupport {
+    static final int RECEIVE_TIMEOUT = 1000;
+    static final int READ_TIMEOUT = 500;
+
     static final String TEST_MESSAGE =
         
"MSH|^~\\&|ADT|EPIC|JCAPS|CC|20161206193919|RISTECH|ADT^A08|00001|D|2.3^^|||||||"
 + '\r'
             + 
"EVN|A08|20150107161440||REG_UPDATE_SEND_VISIT_MESSAGES_ON_PATIENT_CHANGES|RISTECH^RADIOLOGY^TECHNOLOGIST^^^^^^UCLA^^^^^RRMC||"
 + '\r'
@@ -158,7 +161,8 @@ public abstract class 
TcpClientProducerEndOfDataAndValidationTestSupport extends
 
                 from(source.getDefaultEndpoint()).routeId(routeId)
                     .log(LoggingLevel.INFO, routeId, "Sending Message")
-                    
.toF("mllp://%s:%d?validatePayload=%b&requireEndOfData=%b", 
mllpServer.getListenHost(), mllpServer.getListenPort(), validatePayload(), 
requireEndOfData())
+                    
.toF("mllp://%s:%d?receiveTimeout=%d&readTimeout=%d&validatePayload=%b&requireEndOfData=%b",
 mllpServer.getListenHost(), mllpServer.getListenPort(),
+                        RECEIVE_TIMEOUT, READ_TIMEOUT, validatePayload(), 
requireEndOfData())
                     .log(LoggingLevel.INFO, routeId, "Received 
Acknowledgement")
                     .to(aa);
             }
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java
index 094ff08..fe6e6f7 100644
--- 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java
@@ -17,6 +17,9 @@
 
 package org.apache.camel.component.mllp;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+
+import java.net.SocketException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
@@ -30,7 +33,6 @@ import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit.rule.mllp.MllpClientResource;
 import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceException;
-import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceTimeoutException;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.apache.camel.test.mllp.Hl7TestMessageGenerator;
 import org.junit.Rule;
@@ -38,7 +40,8 @@ import org.junit.Test;
 
 public abstract class TcpServerConsumerEndOfDataAndValidationTestSupport 
extends CamelTestSupport {
     static final int CONNECT_TIMEOUT = 500;
-    static final int RESPONSE_TIMEOUT = 5000;
+    static final int RECEIVE_TIMEOUT = 1000;
+    static final int READ_TIMEOUT = 500;
 
     @Rule
     public MllpClientResource mllpClient = new MllpClientResource();
@@ -46,10 +49,14 @@ public abstract class 
TcpServerConsumerEndOfDataAndValidationTestSupport extends
     @EndpointInject(uri = "mock://complete")
     MockEndpoint complete;
 
+    @EndpointInject(uri = "mock://failed")
+    MockEndpoint failed;
+
     @EndpointInject(uri = "mock://invalid-ex")
     MockEndpoint invalid;
 
     int expectedCompleteCount;
+    int expectedFailedCount;
     int expectedInvalidCount;
 
     @Override
@@ -76,8 +83,11 @@ public abstract class 
TcpServerConsumerEndOfDataAndValidationTestSupport extends
                 onException(MllpInvalidMessageException.class)
                     .to(invalid);
 
-                
fromF("mllp://%s:%d?autoAck=true&connectTimeout=%d&receiveTimeout=%d&validatePayload=%b&requireEndOfData=%b",
-                    mllpClient.getMllpHost(), mllpClient.getMllpPort(), 
CONNECT_TIMEOUT, RESPONSE_TIMEOUT, validatePayload(), requireEndOfData())
+                onCompletion().onFailureOnly()
+                    .to(failed);
+
+                
fromF("mllp://%s:%d?autoAck=true&connectTimeout=%d&receiveTimeout=%d&readTimeout=%d&validatePayload=%b&requireEndOfData=%b",
+                    mllpClient.getMllpHost(), mllpClient.getMllpPort(), 
CONNECT_TIMEOUT, RECEIVE_TIMEOUT, READ_TIMEOUT, validatePayload(), 
requireEndOfData())
                     .routeId(routeId)
                     .log(LoggingLevel.INFO, routeId, "Test route received 
message")
                     .to(complete);
@@ -91,6 +101,7 @@ public abstract class 
TcpServerConsumerEndOfDataAndValidationTestSupport extends
 
     protected void setExpectedCounts() {
         complete.expectedMessageCount(expectedCompleteCount);
+        failed.expectedMessageCount(expectedFailedCount);
         invalid.expectedMessageCount(expectedInvalidCount);
     }
 
@@ -160,7 +171,7 @@ public abstract class 
TcpServerConsumerEndOfDataAndValidationTestSupport extends
         log.info("Sending TEST_MESSAGE_2");
         String acknowledgement2 = 
mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(2));
 
-        assertTrue("First two normal exchanges did not complete", 
notify1.matches(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS));
+        assertTrue("First two normal exchanges did not complete", 
notify1.matches(RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS));
 
         log.info("Sending TEST_MESSAGE_3");
         mllpClient.setSendEndOfBlock(false);
@@ -183,7 +194,7 @@ public abstract class 
TcpServerConsumerEndOfDataAndValidationTestSupport extends
         log.info("Sending TEST_MESSAGE_5");
         String acknowledgement5 = 
mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(5));
 
-        assertTrue("Remaining exchanges did not complete", 
notify2.matches(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS));
+        assertTrue("Remaining exchanges did not complete", 
notify2.matches(RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS));
 
         assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
 
@@ -207,27 +218,47 @@ public abstract class 
TcpServerConsumerEndOfDataAndValidationTestSupport extends
         // Send one message to establish the connection and start the 
ConsumerClientSocketThread
         mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage());
 
-        assertTrue("One exchange should have completed", 
oneDone.matches(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS));
+        assertTrue("One exchange should have completed", 
oneDone.matches(RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS));
 
         mllpClient.setSendEndOfBlock(false);
         mllpClient.setSendEndOfData(false);
 
-        mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage());
+        
mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage());
 
-        assertTrue("Two exchanges should have completed", 
twoDone.matches(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS));
+        assertTrue("Two exchanges should have completed", 
twoDone.matches(RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS));
     }
 
 
     @Test
     public void testInitialMessageReadTimeout() throws Exception {
-        expectedInvalidCount = 1;
+        expectedCompleteCount = 1;
 
         setExpectedCounts();
 
         mllpClient.setSendEndOfBlock(false);
         mllpClient.setSendEndOfData(false);
 
-        mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage());
+        log.info("Sending first message");
+        
mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage(10001));
+
+        Thread.sleep(RECEIVE_TIMEOUT * 5);
+
+        mllpClient.setSendEndOfBlock(true);
+        mllpClient.setSendEndOfData(true);
+
+        try {
+            log.info("Attempting to send second message");
+            String acknowledgement = 
mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(10002));
+            assertEquals("If the send doesn't throw an exception, the 
acknowledgement should be empty", "", acknowledgement);
+        } catch (MllpJUnitResourceException expected) {
+            assertThat("If the send throws an exception, the cause should be a 
SocketException", expected.getCause(), instanceOf(SocketException.class));
+        }
+
+        mllpClient.disconnect();
+        mllpClient.connect();
+
+        log.info("Sending third message");
+        String acknowledgement = 
mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(10003));
     }
 
     @Test
@@ -306,16 +337,6 @@ public abstract class 
TcpServerConsumerEndOfDataAndValidationTestSupport extends
     @Test
     public abstract void testMessageContainingEmbeddedEndOfBlock() throws 
Exception;
 
-    protected void runMessageContainingEmbeddedEndOfBlock() throws Exception {
-        setExpectedCounts();
-
-        NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create();
-
-        
mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN",
 "EVN" + MllpProtocolConstants.END_OF_BLOCK));
-
-        assertTrue("Exchange should have completed", done.matches(15, 
TimeUnit.SECONDS));
-    }
-
     @Test
     public abstract void testInvalidMessageContainingEmbeddedEndOfBlock() 
throws Exception;
 
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java
index 092be01..cd87fc6 100644
--- 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java
@@ -403,7 +403,7 @@ public class MllpSocketBufferTest extends 
SocketBufferTestSupport {
         assertFalse("Unexpected initial value", instance.hasEndOfBlock());
 
         instance.write(MllpProtocolConstants.END_OF_BLOCK);
-        assertTrue(instance.hasEndOfBlock());
+        assertFalse("START_OF_BLOCK before an END_OF_BLOCK", 
instance.hasEndOfBlock());
 
         instance.reset();
         assertFalse(instance.hasEndOfBlock());
@@ -457,7 +457,7 @@ public class MllpSocketBufferTest extends 
SocketBufferTestSupport {
         assertFalse(instance.hasEndOfData());
 
         instance.write(MllpProtocolConstants.END_OF_DATA);
-        assertTrue(instance.hasEndOfData());
+        assertFalse("Need a START_OF_BLOCK before the END_OF_DATA",  
instance.hasEndOfData());
 
         instance.reset();
         assertFalse(instance.hasEndOfData());
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java
index 1f21bbf..903669e 100644
--- 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java
@@ -57,7 +57,7 @@ public class MllpSocketBufferWriteTest extends 
SocketBufferTestSupport {
 
         assertEquals(1, instance.size());
         assertEquals(-1, instance.startOfBlockIndex);
-        assertEquals(0, instance.endOfBlockIndex);
+        assertEquals(-1, instance.endOfBlockIndex);
     }
 
     /**
@@ -142,7 +142,7 @@ public class MllpSocketBufferWriteTest extends 
SocketBufferTestSupport {
 
         assertEquals(6, instance.size());
         assertEquals(-1, instance.startOfBlockIndex);
-        assertEquals(4, instance.endOfBlockIndex);
+        assertEquals(-1, instance.endOfBlockIndex);
     }
 
     /**
diff --git a/components/camel-mllp/src/test/resources/log4j2.properties 
b/components/camel-mllp/src/test/resources/log4j2.properties
index 5c96df2..4179677 100644
--- a/components/camel-mllp/src/test/resources/log4j2.properties
+++ b/components/camel-mllp/src/test/resources/log4j2.properties
@@ -31,4 +31,4 @@ rootLogger.appenderRef.file.ref = file
 
 loggers = mllp
 logger.mllp.name = org.apache.camel.component.mllp
-# logger.mllp.level = TRACE
+logger.mllp.level = DEBUG

-- 
To stop receiving notification emails like this one, please contact
qu...@apache.org.

Reply via email to