This is an automated email from the ASF dual-hosted git repository. quinn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 14d2cf99cd0fbd85ab5a8383e4516acdffb65a7b Author: Quinn Stevenson <qu...@apache.org> AuthorDate: Thu Mar 15 07:49:00 2018 -0600 CAMEL-12325: Correct idleTimeout behaviour for the MllpTcpClientProducer --- .../apache/camel/component/mllp/MllpEndpoint.java | 11 +- .../component/mllp/MllpTcpClientProducer.java | 111 ++++++++++++++------- .../component/mllp/internal/MllpSocketBuffer.java | 10 +- .../MllpTcpClientProducerConnectionErrorTest.java | 53 ++++++---- 4 files changed, 121 insertions(+), 64 deletions(-) diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java index ae8975b..dbadfa8 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java @@ -53,7 +53,8 @@ import org.slf4j.LoggerFactory; * <p/> */ @ManagedResource(description = "MLLP Endpoint") -@UriEndpoint(scheme = "mllp", firstVersion = "2.17.0", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp") +// @UriEndpoint(scheme = "mllp", firstVersion = "2.17.0", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp") +@UriEndpoint(scheme = "mllp", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp") public class MllpEndpoint extends DefaultEndpoint { // Use constants from MllpProtocolConstants @Deprecated() @@ -175,6 +176,14 @@ public class MllpEndpoint extends DefaultEndpoint { return lastConnectionTerminatedTicks != null ? new Date(lastConnectionTerminatedTicks) : null; } + public boolean hasLastConnectionActivityTicks() { + return lastConnectionActivityTicks != null && lastConnectionActivityTicks > 0; + } + + public Long getLastConnectionActivityTicks() { + return lastConnectionActivityTicks; + } + public void updateLastConnectionActivityTicks() { updateLastConnectionActivityTicks(System.currentTimeMillis()); } diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java index 9b7ca63..df5705a 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java @@ -39,6 +39,10 @@ import org.apache.camel.api.management.ManagedResource; import org.apache.camel.component.mllp.internal.Hl7Util; import org.apache.camel.component.mllp.internal.MllpSocketBuffer; import org.apache.camel.impl.DefaultProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + /** * The MLLP producer. @@ -50,7 +54,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { final MllpSocketBuffer mllpBuffer; ScheduledExecutorService idleTimeoutExecutor; - long lastProcessCallTicks = -1; + // long lastProcessCallTicks = -1; private String cachedLocalAddress; private String cachedRemoteAddress; @@ -65,7 +69,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { @ManagedAttribute(description = "Last activity time") public Date getLastActivityTime() { - return new Date(lastProcessCallTicks); + return getEndpoint().getLastConnectionActivityTime(); } @ManagedAttribute(description = "Connection") @@ -122,8 +126,8 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { } @Override - public synchronized void process(Exchange exchange) throws Exception { - log.trace("Processing Exchange {}", exchange.getExchangeId()); + public synchronized void process(Exchange exchange) throws MllpException { + log.trace("Processing Exchange {} for {}", exchange.getExchangeId(), socket); getEndpoint().updateLastConnectionActivityTicks(); Message message = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); @@ -158,77 +162,96 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { } } - log.debug("Sending message to external system"); - getEndpoint().updateLastConnectionEstablishedTicks(); + log.debug("Sending message to external system {}", socket); try { mllpBuffer.setEnvelopedMessage(hl7MessageBytes); mllpBuffer.writeTo(socket); } catch (MllpSocketException writeEx) { // Connection may have been reset - try one more time - log.debug("Exception encountered reading acknowledgement - attempting reconnect", writeEx); + log.debug("Exception encountered writing payload to {} - attempting reconnect", writeEx, socket); try { checkConnection(); - log.trace("Reconnected succeeded - resending payload"); + log.trace("Reconnected succeeded - resending payload to {}", socket); try { mllpBuffer.writeTo(socket); } catch (MllpSocketException retryWriteEx) { - exchange.setException(retryWriteEx); + log.warn("Exception encountered attempting to write payload to {} after reconnect - sending original exception to exchange", socket, retryWriteEx); + exchange.setException(new MllpWriteException("Exception encountered writing payload after reconnect", mllpBuffer.toByteArrayAndReset(), retryWriteEx)); } } catch (IOException reconnectEx) { - log.debug("Reconnected failed - sending exception to exchange", reconnectEx); - exchange.setException(reconnectEx); + log.warn("Exception encountered attempting to reconnect - sending original exception to exchange", reconnectEx); + exchange.setException(new MllpWriteException("Exception encountered writing payload", mllpBuffer.toByteArrayAndReset(), writeEx)); + mllpBuffer.resetSocket(socket); } - } if (exchange.getException() == null) { - log.debug("Reading acknowledgement from external system"); + log.debug("Reading acknowledgement from external system {}", socket); try { mllpBuffer.reset(); mllpBuffer.readFrom(socket); } catch (MllpSocketException receiveAckEx) { // Connection may have been reset - try one more time - log.debug("Exception encountered reading acknowledgement - attempting reconnect", receiveAckEx); + log.debug("Exception encountered reading acknowledgement from {} - attempting reconnect", socket, receiveAckEx); try { checkConnection(); } catch (IOException reconnectEx) { - log.debug("Reconnected failed - sending original exception to exchange", reconnectEx); + log.warn("Exception encountered attempting to reconnect after acknowledgement read failure - sending original acknowledgement exception to exchange", reconnectEx); exchange.setException(new MllpAcknowledgementReceiveException("Exception encountered reading acknowledgement", hl7MessageBytes, receiveAckEx)); + mllpBuffer.resetSocket(socket); } if (exchange.getException() == null) { - log.trace("Reconnected succeeded - resending payload"); + log.trace("Reconnected succeeded - resending payload to {}", socket); try { mllpBuffer.setEnvelopedMessage(hl7MessageBytes); mllpBuffer.writeTo(socket); } catch (MllpSocketException writeRetryEx) { - exchange.setException(new MllpWriteException("Failed to write HL7 message to socket", hl7MessageBytes, writeRetryEx)); + log.warn("Exception encountered attempting to write payload to {} after read failure and successful reconnect - sending original exception to exchange", + socket, writeRetryEx); + exchange.setException(new MllpWriteException("Exception encountered writing payload after read failure and reconnect", hl7MessageBytes, receiveAckEx)); } if (exchange.getException() == null) { - log.trace("Resend succeeded - reading acknowledgement"); + log.trace("Resend succeeded - reading acknowledgement from {}", socket); try { mllpBuffer.reset(); mllpBuffer.readFrom(socket); } catch (MllpSocketException secondReceiveEx) { if (mllpBuffer.isEmpty()) { - Exception exchangeEx = new MllpAcknowledgementReceiveException("Exception encountered receiving Acknowledgement", hl7MessageBytes, secondReceiveEx); + log.warn("Exception encountered reading acknowledgement from {} after successful reconnect and resend", socket, secondReceiveEx); + Exception exchangeEx = new MllpAcknowledgementReceiveException("Exception encountered receiving Acknowledgement", hl7MessageBytes, receiveAckEx); exchange.setException(exchangeEx); } else { byte[] partialAcknowledgment = mllpBuffer.toByteArray(); mllpBuffer.reset(); + log.warn("Exception encountered reading a complete acknowledgement from {} after successful reconnect and resend", socket, secondReceiveEx); Exception exchangeEx = new MllpAcknowledgementReceiveException("Exception encountered receiving complete Acknowledgement", - hl7MessageBytes, partialAcknowledgment, secondReceiveEx); + hl7MessageBytes, partialAcknowledgment, receiveAckEx); exchange.setException(exchangeEx); } + } catch (SocketTimeoutException secondReadTimeoutEx) { + if (mllpBuffer.isEmpty()) { + log.warn("Timeout receiving HL7 Acknowledgment from {} after successful reconnect", socket, secondReadTimeoutEx); + exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving HL7 Acknowledgement after successful reconnect", + hl7MessageBytes, secondReadTimeoutEx)); + } else { + log.warn("Timeout receiving complete HL7 Acknowledgment from {} after successful reconnect", socket, secondReadTimeoutEx); + exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving complete HL7 Acknowledgement after successful reconnect", + hl7MessageBytes, mllpBuffer.toByteArray(), secondReadTimeoutEx)); + mllpBuffer.reset(); + } + mllpBuffer.resetSocket(socket); } } } } catch (SocketTimeoutException timeoutEx) { if (mllpBuffer.isEmpty()) { + log.warn("Timeout receiving HL7 Acknowledgment from {}", socket, timeoutEx); exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving HL7 Acknowledgement", hl7MessageBytes, timeoutEx)); } else { + log.warn("Timeout receiving complete HL7 Acknowledgment from {}", socket, timeoutEx); exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving complete HL7 Acknowledgement", hl7MessageBytes, mllpBuffer.toByteArray(), timeoutEx)); mllpBuffer.reset(); } @@ -239,7 +262,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { if (mllpBuffer.hasCompleteEnvelope()) { byte[] acknowledgementBytes = mllpBuffer.toMllpPayload(); - log.debug("Populating message headers with the acknowledgement from the external system"); + log.debug("Populating message headers with the acknowledgement from the external system {}", socket); message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, acknowledgementBytes); if (acknowledgementBytes != null && acknowledgementBytes.length > 0) { message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, new String(acknowledgementBytes, getConfiguration().getCharset(exchange, acknowledgementBytes))); @@ -255,7 +278,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { } if (exchange.getException() == null) { - log.debug("Processing the acknowledgement from the external system"); + log.debug("Processing the acknowledgement from the external system {}", socket); try { message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, processAcknowledgment(hl7MessageBytes, acknowledgementBytes)); } catch (MllpNegativeAcknowledgementException nackEx) { @@ -272,6 +295,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { } } catch (IOException ioEx) { + log.debug("Exception encountered checking connection {}", socket, ioEx); exchange.setException(ioEx); mllpBuffer.resetSocket(socket); } finally { @@ -358,26 +382,34 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { */ void checkConnection() throws IOException { if (null == socket || socket.isClosed() || !socket.isConnected()) { - socket = new Socket(); + if (socket == null) { + log.debug("checkConnection() - Socket is null - attempting to establish connection", socket); + } else if (socket.isClosed()) { + log.info("checkConnection() - Socket {} is closed - attempting to establish new connection", socket); + } else if (!socket.isConnected()) { + log.info("checkConnection() - Socket {} is not connected - attempting to establish new connection", socket); + } + + Socket newSocket = new Socket(); if (getConfiguration().hasKeepAlive()) { - socket.setKeepAlive(getConfiguration().getKeepAlive()); + newSocket.setKeepAlive(getConfiguration().getKeepAlive()); } if (getConfiguration().hasTcpNoDelay()) { - socket.setTcpNoDelay(getConfiguration().getTcpNoDelay()); + newSocket.setTcpNoDelay(getConfiguration().getTcpNoDelay()); } if (getConfiguration().hasReceiveBufferSize()) { - socket.setReceiveBufferSize(getConfiguration().getReceiveBufferSize()); + newSocket.setReceiveBufferSize(getConfiguration().getReceiveBufferSize()); } if (getConfiguration().hasSendBufferSize()) { - socket.setSendBufferSize(getConfiguration().getSendBufferSize()); + newSocket.setSendBufferSize(getConfiguration().getSendBufferSize()); } if (getConfiguration().hasReuseAddress()) { - socket.setReuseAddress(getConfiguration().getReuseAddress()); + newSocket.setReuseAddress(getConfiguration().getReuseAddress()); } - socket.setSoLinger(false, -1); + newSocket.setSoLinger(false, -1); InetSocketAddress socketAddress; if (null == getEndpoint().getHostname()) { @@ -386,7 +418,12 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { socketAddress = new InetSocketAddress(getEndpoint().getHostname(), getEndpoint().getPort()); } - socket.connect(socketAddress, getConfiguration().getConnectTimeout()); + newSocket.connect(socketAddress, getConfiguration().getConnectTimeout()); + log.info("checkConnection() - established new connection {}", newSocket); + getEndpoint().updateLastConnectionEstablishedTicks(); + + socket = newSocket; + SocketAddress localSocketAddress = socket.getLocalSocketAddress(); if (localSocketAddress != null) { cachedLocalAddress = localSocketAddress.toString(); @@ -397,14 +434,12 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { } cachedCombinedAddress = MllpSocketBuffer.formatAddressString(localSocketAddress, remoteSocketAddress); - log.info("checkConnection() - established new connection {}", cachedCombinedAddress); - getEndpoint().updateLastConnectionEstablishedTicks(); - if (getConfiguration().hasIdleTimeout()) { + log.debug("Scheduling initial idle producer connection check of {} in {} milliseconds", getConnectionAddress(), getConfiguration().getIdleTimeout()); idleTimeoutExecutor.schedule(this, getConfiguration().getIdleTimeout(), TimeUnit.MILLISECONDS); } } else { - log.debug("checkConnection() - Connection is still valid - no new connection required"); + log.debug("checkConnection() - Connection {} is still valid - no new connection required", socket); } } @@ -415,13 +450,13 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { public synchronized void run() { if (getConfiguration().hasIdleTimeout()) { if (null != socket && !socket.isClosed() && socket.isConnected()) { - if (lastProcessCallTicks > 0) { - long idleTime = System.currentTimeMillis() - lastProcessCallTicks; + if (getEndpoint().hasLastConnectionActivityTicks()) { + long idleTime = System.currentTimeMillis() - getEndpoint().getLastConnectionActivityTicks(); if (log.isDebugEnabled()) { log.debug("Checking {} for idle connection: {} - {}", getConnectionAddress(), idleTime, getConfiguration().getIdleTimeout()); } if (idleTime >= getConfiguration().getIdleTimeout()) { - log.info("MLLP Connection idle time of '{}' milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - resetting conection", + log.info("MLLP Connection idle time of '{}' milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - resetting connection", idleTime, getConfiguration().getIdleTimeout()); mllpBuffer.resetSocket(socket); } else { @@ -433,7 +468,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { idleTimeoutExecutor.schedule(this, delay, TimeUnit.MILLISECONDS); } } else { - log.debug("Scheduling idle producer connection check in {} milliseconds", getConfiguration().getIdleTimeout()); + log.debug("No activity detected since initial connection - scheduling idle producer connection check in {} milliseconds", getConfiguration().getIdleTimeout()); idleTimeoutExecutor.schedule(this, getConfiguration().getIdleTimeout(), TimeUnit.MILLISECONDS); } } diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java index 020ec74..1d7daf6 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java @@ -175,8 +175,8 @@ public class MllpSocketBuffer { } public synchronized void readFrom(Socket socket, int receiveTimeout, int readTimeout) throws MllpSocketException, SocketTimeoutException { - log.trace("Entering readFrom ..."); if (socket != null && socket.isConnected() && !socket.isClosed()) { + log.trace("Entering readFrom for {} ...", socket); ensureCapacity(MIN_BUFFER_SIZE); try { @@ -203,7 +203,7 @@ public class MllpSocketBuffer { } finally { if (size() > 0 && !hasCompleteEnvelope()) { if (!hasEndOfData() && hasEndOfBlock() && endOfBlockIndex < size() - 1) { - log.warn("readFrom exiting with partial payload ", Hl7Util.convertToPrintFriendlyString(buffer, 0, size() - 1)); + log.warn("readFrom {} exiting with partial payload {}", socket, Hl7Util.convertToPrintFriendlyString(buffer, 0, size() - 1)); } } } @@ -216,8 +216,8 @@ public class MllpSocketBuffer { } public synchronized void writeTo(Socket socket) throws MllpSocketException { - log.trace("Entering writeTo ..."); if (socket != null && socket.isConnected() && !socket.isClosed()) { + log.trace("Entering writeTo for {} ...", socket); if (!isEmpty()) { try { OutputStream socketOutputStream = socket.getOutputStream(); @@ -243,7 +243,7 @@ public class MllpSocketBuffer { throw new MllpSocketException(exceptionMessage, ioEx); } } else { - log.warn("Ignoring call to writeTo(byte[] payload) - MLLP payload is null or empty"); + log.warn("Ignoring call to writeTo(byte[] payload) for {} - MLLP payload is null or empty", socket); } } else { log.warn("Socket is invalid - no data written"); @@ -617,7 +617,7 @@ public class MllpSocketBuffer { int readCount = socketInputStream.read(buffer, availableByteCount, buffer.length - availableByteCount); if (readCount == MllpProtocolConstants.END_OF_STREAM) { resetSocket(socket); - throw new SocketException("END_OF_STREAM returned from SocketInputStream.read(byte[], off, len)"); + throw new MllpSocketException("END_OF_STREAM returned from SocketInputStream.read(byte[], off, len)"); } if (readCount > 0) { for (int i = 0; (startOfBlockIndex == -1 || endOfBlockIndex == -1) && i < readCount; ++i) { diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java index c348438..dbc6575 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java @@ -45,6 +45,9 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport { @EndpointInject(uri = "direct://source") ProducerTemplate source; + @EndpointInject(uri = "mock://target") + MockEndpoint target; + @EndpointInject(uri = "mock://complete") MockEndpoint complete; @@ -54,8 +57,8 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport { @EndpointInject(uri = "mock://connect-ex") MockEndpoint connectEx; - @EndpointInject(uri = "mock://receive-ex") - MockEndpoint receiveEx; + @EndpointInject(uri = "mock://acknowledgement-ex") + MockEndpoint acknowledgementEx; @Override protected CamelContext createCamelContext() throws Exception { @@ -74,6 +77,9 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport { String routeId = "mllp-sender"; public void configure() { + onCompletion() + .to(complete); + onException(ConnectException.class) .handled(true) .to(connectEx) @@ -86,17 +92,17 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport { .log(LoggingLevel.ERROR, routeId, "Write Error") .stop(); - onException(MllpAcknowledgementReceiveException.class) + onException(MllpAcknowledgementException.class) .handled(true) - .to(receiveEx) - .log(LoggingLevel.ERROR, routeId, "Receive Error") + .to(acknowledgementEx) + .log(LoggingLevel.ERROR, routeId, "Acknowledgement Error") .stop(); from(source.getDefaultEndpoint()).routeId(routeId) .log(LoggingLevel.INFO, routeId, "Sending Message") .toF("mllp://%s:%d", mllpServer.getListenHost(), mllpServer.getListenPort()) .log(LoggingLevel.INFO, routeId, "Received Acknowledgement") - .to(complete); + .to(target); } }; } @@ -108,10 +114,11 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport { */ @Test public void testConnectionClosedBeforeSendingHL7Message() throws Exception { + target.expectedMessageCount(2); complete.expectedMessageCount(2); connectEx.expectedMessageCount(0); writeEx.expectedMessageCount(0); - receiveEx.expectedMessageCount(0); + acknowledgementEx.expectedMessageCount(0); NotifyBuilder oneDone = new NotifyBuilder(context).whenCompleted(1).create(); NotifyBuilder twoDone = new NotifyBuilder(context).whenCompleted(2).create(); @@ -136,10 +143,11 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport { */ @Test() public void testConnectionResetBeforeSendingHL7Message() throws Exception { + target.expectedMessageCount(2); complete.expectedMessageCount(2); connectEx.expectedMessageCount(0); writeEx.expectedMessageCount(0); - receiveEx.expectedMessageCount(0); + acknowledgementEx.expectedMessageCount(0); NotifyBuilder oneDone = new NotifyBuilder(context).whenCompleted(1).create(); NotifyBuilder twoDone = new NotifyBuilder(context).whenCompleted(2).create(); @@ -158,10 +166,11 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport { @Test() public void testConnectionClosedBeforeReadingAcknowledgement() throws Exception { - complete.expectedMessageCount(0); + target.expectedMessageCount(0); + complete.expectedMessageCount(1); connectEx.expectedMessageCount(0); writeEx.expectedMessageCount(0); - receiveEx.expectedMessageCount(1); + acknowledgementEx.expectedMessageCount(1); mllpServer.setCloseSocketBeforeAcknowledgementModulus(1); @@ -176,10 +185,11 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport { @Test() public void testConnectionResetBeforeReadingAcknowledgement() throws Exception { - complete.expectedMessageCount(0); + target.expectedMessageCount(0); + complete.expectedMessageCount(1); connectEx.expectedMessageCount(0); writeEx.expectedMessageCount(0); - receiveEx.expectedMessageCount(1); + acknowledgementEx.expectedMessageCount(1); mllpServer.setResetSocketBeforeAcknowledgementModulus(1); @@ -195,7 +205,8 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport { @Test() public void testServerShutdownBeforeSendingHL7Message() throws Exception { - complete.expectedMessageCount(1); + target.expectedMessageCount(1); + complete.expectedMessageCount(2); connectEx.expectedMessageCount(0); NotifyBuilder done = new NotifyBuilder(context).whenCompleted(2).create(); @@ -212,12 +223,13 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport { assertMockEndpointsSatisfied(5, TimeUnit.SECONDS); // Depending on the timing, either a write or a receive exception will be thrown - assertEquals("Either a write or a receive exception should have been be thrown", 1, writeEx.getExchanges().size() + receiveEx.getExchanges().size()); + assertEquals("Either a write or a receive exception should have been be thrown", 1, writeEx.getExchanges().size() + acknowledgementEx.getExchanges().size()); } @Test() public void testConnectionCloseAndServerShutdownBeforeSendingHL7Message() throws Exception { - complete.expectedMessageCount(1); + target.expectedMessageCount(1); + complete.expectedMessageCount(2); connectEx.expectedMessageCount(0); NotifyBuilder done = new NotifyBuilder(context).whenCompleted(2).create(); @@ -235,15 +247,16 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport { assertMockEndpointsSatisfied(5, TimeUnit.SECONDS); // Depending on the timing, either a write or a receive exception will be thrown - assertEquals("Either a write or a receive exception should have been be thrown", 1, writeEx.getExchanges().size() + receiveEx.getExchanges().size()); + assertEquals("Either a write or a receive exception should have been be thrown", 1, writeEx.getExchanges().size() + acknowledgementEx.getExchanges().size()); } @Test() public void testConnectionResetAndServerShutdownBeforeSendingHL7Message() throws Exception { - complete.expectedMessageCount(1); - connectEx.expectedMessageCount(1); - writeEx.expectedMessageCount(0); - receiveEx.expectedMessageCount(0); + target.expectedMessageCount(1); + complete.expectedMessageCount(2); + connectEx.expectedMessageCount(0); + writeEx.expectedMessageCount(1); + acknowledgementEx.expectedMessageCount(0); NotifyBuilder done = new NotifyBuilder(context).whenCompleted(2).create(); -- To stop receiving notification emails like this one, please contact qu...@apache.org.