This is an automated email from the ASF dual-hosted git repository.

quinn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 14d2cf99cd0fbd85ab5a8383e4516acdffb65a7b
Author: Quinn Stevenson <qu...@apache.org>
AuthorDate: Thu Mar 15 07:49:00 2018 -0600

    CAMEL-12325: Correct idleTimeout behaviour for the MllpTcpClientProducer
---
 .../apache/camel/component/mllp/MllpEndpoint.java  |  11 +-
 .../component/mllp/MllpTcpClientProducer.java      | 111 ++++++++++++++-------
 .../component/mllp/internal/MllpSocketBuffer.java  |  10 +-
 .../MllpTcpClientProducerConnectionErrorTest.java  |  53 ++++++----
 4 files changed, 121 insertions(+), 64 deletions(-)

diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index ae8975b..dbadfa8 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -53,7 +53,8 @@ import org.slf4j.LoggerFactory;
  * <p/>
  */
 @ManagedResource(description = "MLLP Endpoint")
-@UriEndpoint(scheme = "mllp", firstVersion = "2.17.0", title = "MLLP", syntax 
= "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = 
"mllp")
+// @UriEndpoint(scheme = "mllp", firstVersion = "2.17.0", title = "MLLP", 
syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, 
label = "mllp")
+@UriEndpoint(scheme = "mllp", title = "MLLP", syntax = "mllp:hostname:port", 
consumerClass = MllpTcpServerConsumer.class, label = "mllp")
 public class MllpEndpoint extends DefaultEndpoint {
     // Use constants from MllpProtocolConstants
     @Deprecated()
@@ -175,6 +176,14 @@ public class MllpEndpoint extends DefaultEndpoint {
         return lastConnectionTerminatedTicks != null ? new 
Date(lastConnectionTerminatedTicks) : null;
     }
 
+    public boolean hasLastConnectionActivityTicks() {
+        return lastConnectionActivityTicks != null && 
lastConnectionActivityTicks > 0;
+    }
+
+    public Long getLastConnectionActivityTicks() {
+        return lastConnectionActivityTicks;
+    }
+
     public void updateLastConnectionActivityTicks() {
         updateLastConnectionActivityTicks(System.currentTimeMillis());
     }
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
index 9b7ca63..df5705a 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
@@ -39,6 +39,10 @@ import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.component.mllp.internal.Hl7Util;
 import org.apache.camel.component.mllp.internal.MllpSocketBuffer;
 import org.apache.camel.impl.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
 
 /**
  * The MLLP producer.
@@ -50,7 +54,7 @@ public class MllpTcpClientProducer extends DefaultProducer 
implements Runnable {
     final MllpSocketBuffer mllpBuffer;
 
     ScheduledExecutorService idleTimeoutExecutor;
-    long lastProcessCallTicks = -1;
+    // long lastProcessCallTicks = -1;
 
     private String cachedLocalAddress;
     private String cachedRemoteAddress;
@@ -65,7 +69,7 @@ public class MllpTcpClientProducer extends DefaultProducer 
implements Runnable {
 
     @ManagedAttribute(description = "Last activity time")
     public Date getLastActivityTime() {
-        return new Date(lastProcessCallTicks);
+        return getEndpoint().getLastConnectionActivityTime();
     }
 
     @ManagedAttribute(description = "Connection")
@@ -122,8 +126,8 @@ public class MllpTcpClientProducer extends DefaultProducer 
implements Runnable {
     }
 
     @Override
-    public synchronized void process(Exchange exchange) throws Exception {
-        log.trace("Processing Exchange {}", exchange.getExchangeId());
+    public synchronized void process(Exchange exchange) throws MllpException {
+        log.trace("Processing Exchange {} for {}", exchange.getExchangeId(), 
socket);
         getEndpoint().updateLastConnectionActivityTicks();
 
         Message message = exchange.hasOut() ? exchange.getOut() : 
exchange.getIn();
@@ -158,77 +162,96 @@ public class MllpTcpClientProducer extends 
DefaultProducer implements Runnable {
                 }
             }
 
-            log.debug("Sending message to external system");
-            getEndpoint().updateLastConnectionEstablishedTicks();
+            log.debug("Sending message to external system {}", socket);
 
             try {
                 mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
                 mllpBuffer.writeTo(socket);
             } catch (MllpSocketException writeEx) {
                 // Connection may have been reset - try one more time
-                log.debug("Exception encountered reading acknowledgement - 
attempting reconnect", writeEx);
+                log.debug("Exception encountered writing payload to {} - 
attempting reconnect", writeEx, socket);
                 try {
                     checkConnection();
-                    log.trace("Reconnected succeeded - resending payload");
+                    log.trace("Reconnected succeeded - resending payload to 
{}", socket);
                     try {
                         mllpBuffer.writeTo(socket);
                     } catch (MllpSocketException retryWriteEx) {
-                        exchange.setException(retryWriteEx);
+                        log.warn("Exception encountered attempting to write 
payload to {} after reconnect - sending original exception to exchange", 
socket, retryWriteEx);
+                        exchange.setException(new 
MllpWriteException("Exception encountered writing payload after reconnect", 
mllpBuffer.toByteArrayAndReset(), retryWriteEx));
                     }
                 } catch (IOException reconnectEx) {
-                    log.debug("Reconnected failed - sending exception to 
exchange", reconnectEx);
-                    exchange.setException(reconnectEx);
+                    log.warn("Exception encountered attempting to reconnect - 
sending original exception to exchange", reconnectEx);
+                    exchange.setException(new MllpWriteException("Exception 
encountered writing payload", mllpBuffer.toByteArrayAndReset(), writeEx));
+                    mllpBuffer.resetSocket(socket);
                 }
-
             }
 
             if (exchange.getException() == null) {
-                log.debug("Reading acknowledgement from external system");
+                log.debug("Reading acknowledgement from external system {}", 
socket);
                 try {
                     mllpBuffer.reset();
                     mllpBuffer.readFrom(socket);
                 } catch (MllpSocketException receiveAckEx) {
                     // Connection may have been reset - try one more time
-                    log.debug("Exception encountered reading acknowledgement - 
attempting reconnect", receiveAckEx);
+                    log.debug("Exception encountered reading acknowledgement 
from {} - attempting reconnect", socket, receiveAckEx);
                     try {
                         checkConnection();
                     } catch (IOException reconnectEx) {
-                        log.debug("Reconnected failed - sending original 
exception to exchange", reconnectEx);
+                        log.warn("Exception encountered attempting to 
reconnect after acknowledgement read failure - sending original acknowledgement 
exception to exchange", reconnectEx);
                         exchange.setException(new 
MllpAcknowledgementReceiveException("Exception encountered reading 
acknowledgement", hl7MessageBytes, receiveAckEx));
+                        mllpBuffer.resetSocket(socket);
                     }
 
                     if (exchange.getException() == null) {
-                        log.trace("Reconnected succeeded - resending payload");
+                        log.trace("Reconnected succeeded - resending payload 
to {}", socket);
                         try {
                             mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
                             mllpBuffer.writeTo(socket);
                         } catch (MllpSocketException writeRetryEx) {
-                            exchange.setException(new 
MllpWriteException("Failed to write HL7 message to socket", hl7MessageBytes, 
writeRetryEx));
+                            log.warn("Exception encountered attempting to 
write payload to {} after read failure and successful reconnect - sending 
original exception to exchange",
+                                socket, writeRetryEx);
+                            exchange.setException(new 
MllpWriteException("Exception encountered writing payload after read failure 
and reconnect", hl7MessageBytes, receiveAckEx));
                         }
 
                         if (exchange.getException() == null) {
-                            log.trace("Resend succeeded - reading 
acknowledgement");
+                            log.trace("Resend succeeded - reading 
acknowledgement from {}", socket);
                             try {
                                 mllpBuffer.reset();
                                 mllpBuffer.readFrom(socket);
                             } catch (MllpSocketException secondReceiveEx) {
                                 if (mllpBuffer.isEmpty()) {
-                                    Exception exchangeEx = new 
MllpAcknowledgementReceiveException("Exception encountered receiving 
Acknowledgement", hl7MessageBytes, secondReceiveEx);
+                                    log.warn("Exception encountered reading 
acknowledgement from {} after successful reconnect and resend", socket, 
secondReceiveEx);
+                                    Exception exchangeEx = new 
MllpAcknowledgementReceiveException("Exception encountered receiving 
Acknowledgement", hl7MessageBytes, receiveAckEx);
                                     exchange.setException(exchangeEx);
                                 } else {
                                     byte[] partialAcknowledgment = 
mllpBuffer.toByteArray();
                                     mllpBuffer.reset();
+                                    log.warn("Exception encountered reading a 
complete acknowledgement from {} after successful reconnect and resend", 
socket, secondReceiveEx);
                                     Exception exchangeEx = new 
MllpAcknowledgementReceiveException("Exception encountered receiving complete 
Acknowledgement",
-                                        hl7MessageBytes, 
partialAcknowledgment, secondReceiveEx);
+                                        hl7MessageBytes, 
partialAcknowledgment, receiveAckEx);
                                     exchange.setException(exchangeEx);
                                 }
+                            } catch (SocketTimeoutException 
secondReadTimeoutEx) {
+                                if (mllpBuffer.isEmpty()) {
+                                    log.warn("Timeout receiving HL7 
Acknowledgment from {} after successful reconnect", socket, 
secondReadTimeoutEx);
+                                    exchange.setException(new 
MllpAcknowledgementTimeoutException("Timeout receiving HL7 Acknowledgement 
after successful reconnect",
+                                        hl7MessageBytes, secondReadTimeoutEx));
+                                } else {
+                                    log.warn("Timeout receiving complete HL7 
Acknowledgment from {} after successful reconnect", socket, 
secondReadTimeoutEx);
+                                    exchange.setException(new 
MllpAcknowledgementTimeoutException("Timeout receiving complete HL7 
Acknowledgement after successful reconnect",
+                                        hl7MessageBytes, 
mllpBuffer.toByteArray(), secondReadTimeoutEx));
+                                    mllpBuffer.reset();
+                                }
+                                mllpBuffer.resetSocket(socket);
                             }
                         }
                     }
                 } catch (SocketTimeoutException timeoutEx) {
                     if (mllpBuffer.isEmpty()) {
+                        log.warn("Timeout receiving HL7 Acknowledgment from 
{}", socket, timeoutEx);
                         exchange.setException(new 
MllpAcknowledgementTimeoutException("Timeout receiving HL7 Acknowledgement", 
hl7MessageBytes, timeoutEx));
                     } else {
+                        log.warn("Timeout receiving complete HL7 
Acknowledgment from {}", socket, timeoutEx);
                         exchange.setException(new 
MllpAcknowledgementTimeoutException("Timeout receiving complete HL7 
Acknowledgement", hl7MessageBytes, mllpBuffer.toByteArray(), timeoutEx));
                         mllpBuffer.reset();
                     }
@@ -239,7 +262,7 @@ public class MllpTcpClientProducer extends DefaultProducer 
implements Runnable {
                     if (mllpBuffer.hasCompleteEnvelope()) {
                         byte[] acknowledgementBytes = 
mllpBuffer.toMllpPayload();
 
-                        log.debug("Populating message headers with the 
acknowledgement from the external system");
+                        log.debug("Populating message headers with the 
acknowledgement from the external system {}", socket);
                         message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, 
acknowledgementBytes);
                         if (acknowledgementBytes != null && 
acknowledgementBytes.length > 0) {
                             
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, new 
String(acknowledgementBytes, getConfiguration().getCharset(exchange, 
acknowledgementBytes)));
@@ -255,7 +278,7 @@ public class MllpTcpClientProducer extends DefaultProducer 
implements Runnable {
                         }
 
                         if (exchange.getException() == null) {
-                            log.debug("Processing the acknowledgement from the 
external system");
+                            log.debug("Processing the acknowledgement from the 
external system {}", socket);
                             try {
                                 
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, 
processAcknowledgment(hl7MessageBytes, acknowledgementBytes));
                             } catch (MllpNegativeAcknowledgementException 
nackEx) {
@@ -272,6 +295,7 @@ public class MllpTcpClientProducer extends DefaultProducer 
implements Runnable {
             }
 
         } catch (IOException ioEx) {
+            log.debug("Exception encountered checking connection {}", socket, 
ioEx);
             exchange.setException(ioEx);
             mllpBuffer.resetSocket(socket);
         } finally {
@@ -358,26 +382,34 @@ public class MllpTcpClientProducer extends 
DefaultProducer implements Runnable {
      */
     void checkConnection() throws IOException {
         if (null == socket || socket.isClosed() || !socket.isConnected()) {
-            socket = new Socket();
+            if (socket == null) {
+                log.debug("checkConnection() - Socket is null - attempting to 
establish connection", socket);
+            } else if (socket.isClosed()) {
+                log.info("checkConnection() - Socket {} is closed - attempting 
to establish new connection", socket);
+            } else if (!socket.isConnected()) {
+                log.info("checkConnection() - Socket {} is not connected - 
attempting to establish new connection", socket);
+            }
+
+            Socket newSocket = new Socket();
 
             if (getConfiguration().hasKeepAlive()) {
-                socket.setKeepAlive(getConfiguration().getKeepAlive());
+                newSocket.setKeepAlive(getConfiguration().getKeepAlive());
             }
             if (getConfiguration().hasTcpNoDelay()) {
-                socket.setTcpNoDelay(getConfiguration().getTcpNoDelay());
+                newSocket.setTcpNoDelay(getConfiguration().getTcpNoDelay());
             }
 
             if (getConfiguration().hasReceiveBufferSize()) {
-                
socket.setReceiveBufferSize(getConfiguration().getReceiveBufferSize());
+                
newSocket.setReceiveBufferSize(getConfiguration().getReceiveBufferSize());
             }
             if (getConfiguration().hasSendBufferSize()) {
-                
socket.setSendBufferSize(getConfiguration().getSendBufferSize());
+                
newSocket.setSendBufferSize(getConfiguration().getSendBufferSize());
             }
             if (getConfiguration().hasReuseAddress()) {
-                socket.setReuseAddress(getConfiguration().getReuseAddress());
+                
newSocket.setReuseAddress(getConfiguration().getReuseAddress());
             }
 
-            socket.setSoLinger(false, -1);
+            newSocket.setSoLinger(false, -1);
 
             InetSocketAddress socketAddress;
             if (null == getEndpoint().getHostname()) {
@@ -386,7 +418,12 @@ public class MllpTcpClientProducer extends DefaultProducer 
implements Runnable {
                 socketAddress = new 
InetSocketAddress(getEndpoint().getHostname(), getEndpoint().getPort());
             }
 
-            socket.connect(socketAddress, 
getConfiguration().getConnectTimeout());
+            newSocket.connect(socketAddress, 
getConfiguration().getConnectTimeout());
+            log.info("checkConnection() - established new connection {}", 
newSocket);
+            getEndpoint().updateLastConnectionEstablishedTicks();
+
+            socket = newSocket;
+
             SocketAddress localSocketAddress = socket.getLocalSocketAddress();
             if (localSocketAddress != null) {
                 cachedLocalAddress = localSocketAddress.toString();
@@ -397,14 +434,12 @@ public class MllpTcpClientProducer extends 
DefaultProducer implements Runnable {
             }
             cachedCombinedAddress = 
MllpSocketBuffer.formatAddressString(localSocketAddress, remoteSocketAddress);
 
-            log.info("checkConnection() - established new connection {}", 
cachedCombinedAddress);
-            getEndpoint().updateLastConnectionEstablishedTicks();
-
             if (getConfiguration().hasIdleTimeout()) {
+                log.debug("Scheduling initial idle producer connection check 
of {} in {} milliseconds", getConnectionAddress(), 
getConfiguration().getIdleTimeout());
                 idleTimeoutExecutor.schedule(this, 
getConfiguration().getIdleTimeout(), TimeUnit.MILLISECONDS);
             }
         } else {
-            log.debug("checkConnection() - Connection is still valid - no new 
connection required");
+            log.debug("checkConnection() - Connection {} is still valid - no 
new connection required", socket);
         }
     }
 
@@ -415,13 +450,13 @@ public class MllpTcpClientProducer extends 
DefaultProducer implements Runnable {
     public synchronized void run() {
         if (getConfiguration().hasIdleTimeout()) {
             if (null != socket && !socket.isClosed() && socket.isConnected()) {
-                if (lastProcessCallTicks > 0) {
-                    long idleTime = System.currentTimeMillis() - 
lastProcessCallTicks;
+                if (getEndpoint().hasLastConnectionActivityTicks()) {
+                    long idleTime = System.currentTimeMillis() - 
getEndpoint().getLastConnectionActivityTicks();
                     if (log.isDebugEnabled()) {
                         log.debug("Checking {} for idle connection: {} - {}", 
getConnectionAddress(), idleTime, getConfiguration().getIdleTimeout());
                     }
                     if (idleTime >= getConfiguration().getIdleTimeout()) {
-                        log.info("MLLP Connection idle time of '{}' 
milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - 
resetting conection",
+                        log.info("MLLP Connection idle time of '{}' 
milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - 
resetting connection",
                             idleTime, getConfiguration().getIdleTimeout());
                         mllpBuffer.resetSocket(socket);
                     } else {
@@ -433,7 +468,7 @@ public class MllpTcpClientProducer extends DefaultProducer 
implements Runnable {
                         idleTimeoutExecutor.schedule(this, delay, 
TimeUnit.MILLISECONDS);
                     }
                 } else {
-                    log.debug("Scheduling idle producer connection check in {} 
milliseconds", getConfiguration().getIdleTimeout());
+                    log.debug("No activity detected since initial connection - 
scheduling idle producer connection check in {} milliseconds", 
getConfiguration().getIdleTimeout());
                     idleTimeoutExecutor.schedule(this, 
getConfiguration().getIdleTimeout(), TimeUnit.MILLISECONDS);
                 }
             }
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
index 020ec74..1d7daf6 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
@@ -175,8 +175,8 @@ public class MllpSocketBuffer {
     }
 
     public synchronized void readFrom(Socket socket, int receiveTimeout, int 
readTimeout) throws MllpSocketException, SocketTimeoutException {
-        log.trace("Entering readFrom ...");
         if (socket != null && socket.isConnected() && !socket.isClosed()) {
+            log.trace("Entering readFrom for {} ...", socket);
             ensureCapacity(MIN_BUFFER_SIZE);
 
             try {
@@ -203,7 +203,7 @@ public class MllpSocketBuffer {
             } finally {
                 if (size() > 0 && !hasCompleteEnvelope()) {
                     if (!hasEndOfData() && hasEndOfBlock() && endOfBlockIndex 
< size() - 1) {
-                        log.warn("readFrom exiting with partial payload ", 
Hl7Util.convertToPrintFriendlyString(buffer, 0, size() - 1));
+                        log.warn("readFrom {} exiting with partial payload 
{}", socket, Hl7Util.convertToPrintFriendlyString(buffer, 0, size() - 1));
                     }
                 }
             }
@@ -216,8 +216,8 @@ public class MllpSocketBuffer {
     }
 
     public synchronized void writeTo(Socket socket) throws MllpSocketException 
{
-        log.trace("Entering writeTo ...");
         if (socket != null && socket.isConnected() && !socket.isClosed()) {
+            log.trace("Entering writeTo for {} ...", socket);
             if (!isEmpty()) {
                 try {
                     OutputStream socketOutputStream = socket.getOutputStream();
@@ -243,7 +243,7 @@ public class MllpSocketBuffer {
                     throw new MllpSocketException(exceptionMessage, ioEx);
                 }
             } else {
-                log.warn("Ignoring call to writeTo(byte[] payload) - MLLP 
payload is null or empty");
+                log.warn("Ignoring call to writeTo(byte[] payload) for {} - 
MLLP payload is null or empty", socket);
             }
         } else {
             log.warn("Socket is invalid - no data written");
@@ -617,7 +617,7 @@ public class MllpSocketBuffer {
             int readCount = socketInputStream.read(buffer, availableByteCount, 
buffer.length - availableByteCount);
             if (readCount == MllpProtocolConstants.END_OF_STREAM) {
                 resetSocket(socket);
-                throw new SocketException("END_OF_STREAM returned from 
SocketInputStream.read(byte[], off, len)");
+                throw new MllpSocketException("END_OF_STREAM returned from 
SocketInputStream.read(byte[], off, len)");
             }
             if (readCount > 0) {
                 for (int i = 0; (startOfBlockIndex == -1 || endOfBlockIndex == 
-1) && i < readCount; ++i) {
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java
index c348438..dbc6575 100644
--- 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java
@@ -45,6 +45,9 @@ public class MllpTcpClientProducerConnectionErrorTest extends 
CamelTestSupport {
     @EndpointInject(uri = "direct://source")
     ProducerTemplate source;
 
+    @EndpointInject(uri = "mock://target")
+    MockEndpoint target;
+
     @EndpointInject(uri = "mock://complete")
     MockEndpoint complete;
 
@@ -54,8 +57,8 @@ public class MllpTcpClientProducerConnectionErrorTest extends 
CamelTestSupport {
     @EndpointInject(uri = "mock://connect-ex")
     MockEndpoint connectEx;
 
-    @EndpointInject(uri = "mock://receive-ex")
-    MockEndpoint receiveEx;
+    @EndpointInject(uri = "mock://acknowledgement-ex")
+    MockEndpoint acknowledgementEx;
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
@@ -74,6 +77,9 @@ public class MllpTcpClientProducerConnectionErrorTest extends 
CamelTestSupport {
             String routeId = "mllp-sender";
 
             public void configure() {
+                onCompletion()
+                    .to(complete);
+
                 onException(ConnectException.class)
                     .handled(true)
                     .to(connectEx)
@@ -86,17 +92,17 @@ public class MllpTcpClientProducerConnectionErrorTest 
extends CamelTestSupport {
                     .log(LoggingLevel.ERROR, routeId, "Write Error")
                     .stop();
 
-                onException(MllpAcknowledgementReceiveException.class)
+                onException(MllpAcknowledgementException.class)
                     .handled(true)
-                    .to(receiveEx)
-                    .log(LoggingLevel.ERROR, routeId, "Receive Error")
+                    .to(acknowledgementEx)
+                    .log(LoggingLevel.ERROR, routeId, "Acknowledgement Error")
                     .stop();
 
                 from(source.getDefaultEndpoint()).routeId(routeId)
                     .log(LoggingLevel.INFO, routeId, "Sending Message")
                     .toF("mllp://%s:%d", mllpServer.getListenHost(), 
mllpServer.getListenPort())
                     .log(LoggingLevel.INFO, routeId, "Received 
Acknowledgement")
-                    .to(complete);
+                    .to(target);
             }
         };
     }
@@ -108,10 +114,11 @@ public class MllpTcpClientProducerConnectionErrorTest 
extends CamelTestSupport {
      */
     @Test
     public void testConnectionClosedBeforeSendingHL7Message() throws Exception 
{
+        target.expectedMessageCount(2);
         complete.expectedMessageCount(2);
         connectEx.expectedMessageCount(0);
         writeEx.expectedMessageCount(0);
-        receiveEx.expectedMessageCount(0);
+        acknowledgementEx.expectedMessageCount(0);
 
         NotifyBuilder oneDone = new 
NotifyBuilder(context).whenCompleted(1).create();
         NotifyBuilder twoDone = new 
NotifyBuilder(context).whenCompleted(2).create();
@@ -136,10 +143,11 @@ public class MllpTcpClientProducerConnectionErrorTest 
extends CamelTestSupport {
      */
     @Test()
     public void testConnectionResetBeforeSendingHL7Message() throws Exception {
+        target.expectedMessageCount(2);
         complete.expectedMessageCount(2);
         connectEx.expectedMessageCount(0);
         writeEx.expectedMessageCount(0);
-        receiveEx.expectedMessageCount(0);
+        acknowledgementEx.expectedMessageCount(0);
 
         NotifyBuilder oneDone = new 
NotifyBuilder(context).whenCompleted(1).create();
         NotifyBuilder twoDone = new 
NotifyBuilder(context).whenCompleted(2).create();
@@ -158,10 +166,11 @@ public class MllpTcpClientProducerConnectionErrorTest 
extends CamelTestSupport {
 
     @Test()
     public void testConnectionClosedBeforeReadingAcknowledgement() throws 
Exception {
-        complete.expectedMessageCount(0);
+        target.expectedMessageCount(0);
+        complete.expectedMessageCount(1);
         connectEx.expectedMessageCount(0);
         writeEx.expectedMessageCount(0);
-        receiveEx.expectedMessageCount(1);
+        acknowledgementEx.expectedMessageCount(1);
 
         mllpServer.setCloseSocketBeforeAcknowledgementModulus(1);
 
@@ -176,10 +185,11 @@ public class MllpTcpClientProducerConnectionErrorTest 
extends CamelTestSupport {
 
     @Test()
     public void testConnectionResetBeforeReadingAcknowledgement() throws 
Exception {
-        complete.expectedMessageCount(0);
+        target.expectedMessageCount(0);
+        complete.expectedMessageCount(1);
         connectEx.expectedMessageCount(0);
         writeEx.expectedMessageCount(0);
-        receiveEx.expectedMessageCount(1);
+        acknowledgementEx.expectedMessageCount(1);
 
         mllpServer.setResetSocketBeforeAcknowledgementModulus(1);
 
@@ -195,7 +205,8 @@ public class MllpTcpClientProducerConnectionErrorTest 
extends CamelTestSupport {
 
     @Test()
     public void testServerShutdownBeforeSendingHL7Message() throws Exception {
-        complete.expectedMessageCount(1);
+        target.expectedMessageCount(1);
+        complete.expectedMessageCount(2);
         connectEx.expectedMessageCount(0);
 
         NotifyBuilder done = new 
NotifyBuilder(context).whenCompleted(2).create();
@@ -212,12 +223,13 @@ public class MllpTcpClientProducerConnectionErrorTest 
extends CamelTestSupport {
         assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
 
         // Depending on the timing, either a write or a receive exception will 
be thrown
-        assertEquals("Either a write or a receive exception should have been 
be thrown", 1, writeEx.getExchanges().size() + receiveEx.getExchanges().size());
+        assertEquals("Either a write or a receive exception should have been 
be thrown", 1, writeEx.getExchanges().size() + 
acknowledgementEx.getExchanges().size());
     }
 
     @Test()
     public void testConnectionCloseAndServerShutdownBeforeSendingHL7Message() 
throws Exception {
-        complete.expectedMessageCount(1);
+        target.expectedMessageCount(1);
+        complete.expectedMessageCount(2);
         connectEx.expectedMessageCount(0);
 
         NotifyBuilder done = new 
NotifyBuilder(context).whenCompleted(2).create();
@@ -235,15 +247,16 @@ public class MllpTcpClientProducerConnectionErrorTest 
extends CamelTestSupport {
         assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
 
         // Depending on the timing, either a write or a receive exception will 
be thrown
-        assertEquals("Either a write or a receive exception should have been 
be thrown", 1, writeEx.getExchanges().size() + receiveEx.getExchanges().size());
+        assertEquals("Either a write or a receive exception should have been 
be thrown", 1, writeEx.getExchanges().size() + 
acknowledgementEx.getExchanges().size());
     }
 
     @Test()
     public void testConnectionResetAndServerShutdownBeforeSendingHL7Message() 
throws Exception {
-        complete.expectedMessageCount(1);
-        connectEx.expectedMessageCount(1);
-        writeEx.expectedMessageCount(0);
-        receiveEx.expectedMessageCount(0);
+        target.expectedMessageCount(1);
+        complete.expectedMessageCount(2);
+        connectEx.expectedMessageCount(0);
+        writeEx.expectedMessageCount(1);
+        acknowledgementEx.expectedMessageCount(0);
 
         NotifyBuilder done = new 
NotifyBuilder(context).whenCompleted(2).create();
 

-- 
To stop receiving notification emails like this one, please contact
qu...@apache.org.

Reply via email to