This is an automated email from the ASF dual-hosted git repository. quinn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new c8f24ca CAMEL-12210 - Improve detection of the end of an MLLP Envelope c8f24ca is described below commit c8f24cac46f9c4a02a869597a0e7ba03565aa9fd Author: Quinn Stevenson <qu...@apache.org> AuthorDate: Mon Jan 29 15:58:00 2018 -0700 CAMEL-12210 - Improve detection of the end of an MLLP Envelope --- .../component/mllp/MllpTcpServerConsumer.java | 55 ++++-- .../component/mllp/internal/MllpSocketBuffer.java | 35 +++- ...eptRunnable.java => TcpServerAcceptThread.java} | 24 ++- .../TcpServerConsumerValidationRunnable.java | 196 +++++++++++++++++++++ .../mllp/internal/TcpSocketConsumerRunnable.java | 45 +++-- ...TcpClientProducerIdleConnectionTimeoutTest.java | 8 +- .../mllp/MllpTcpServerConsumerConnectionTest.java | 5 +- ...onsumerOptionalEndOfDataWithValidationTest.java | 13 +- ...umerOptionalEndOfDataWithoutValidationTest.java | 14 +- ...onsumerRequiredEndOfDataWithValidationTest.java | 15 +- ...umerRequiredEndOfDataWithoutValidationTest.java | 12 +- ...tProducerEndOfDataAndValidationTestSupport.java | 6 +- ...rConsumerEndOfDataAndValidationTestSupport.java | 63 ++++--- .../mllp/internal/MllpSocketBufferTest.java | 4 +- .../mllp/internal/MllpSocketBufferWriteTest.java | 4 +- .../src/test/resources/log4j2.properties | 2 +- 16 files changed, 412 insertions(+), 89 deletions(-) diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java index bc4f41d..72b00bd 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -36,29 +37,27 @@ import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.component.mllp.internal.MllpSocketBuffer; -import org.apache.camel.component.mllp.internal.TcpServerAcceptRunnable; +import org.apache.camel.component.mllp.internal.TcpServerConsumerValidationRunnable; +import org.apache.camel.component.mllp.internal.TcpServerAcceptThread; import org.apache.camel.component.mllp.internal.TcpSocketConsumerRunnable; import org.apache.camel.impl.DefaultConsumer; -import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerator; /** * The MLLP consumer. */ @ManagedResource(description = "MLLP Producer") public class MllpTcpServerConsumer extends DefaultConsumer { - final ExecutorService acceptExecutor; + final ExecutorService validationExecutor; final ExecutorService consumerExecutor; - TcpServerAcceptRunnable acceptRunnable; + TcpServerAcceptThread acceptThread; Map<TcpSocketConsumerRunnable, Long> consumerRunnables = new ConcurrentHashMap<>(); public MllpTcpServerConsumer(MllpEndpoint endpoint, Processor processor) { super(endpoint, processor); log.trace("MllpTcpServerConsumer(endpoint, processor)"); - // this.endpoint = endpoint; - // this.configuration = endpoint.getConfiguration(); - acceptExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>()); + validationExecutor = Executors.newCachedThreadPool(); consumerExecutor = new ThreadPoolExecutor(1, getConfiguration().getMaxConcurrentConsumers(), getConfiguration().getAcceptTimeout(), TimeUnit.MILLISECONDS, new SynchronousQueue<>()); } @@ -111,9 +110,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer { consumerClientSocketThread.stop(); } - acceptRunnable.stop(); + acceptThread.interrupt(); - acceptRunnable = null; + acceptThread = null; super.doStop(); } @@ -171,8 +170,10 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } } while (!serverSocket.isBound()); - acceptRunnable = new TcpServerAcceptRunnable(this, serverSocket); - acceptExecutor.submit(acceptRunnable); + // acceptRunnable = new TcpServerConsumerValidationRunnable(this, serverSocket); + // validationExecutor.submit(acceptRunnable); + acceptThread = new TcpServerAcceptThread(this, serverSocket); + acceptThread.start(); super.doStart(); } @@ -181,7 +182,10 @@ public class MllpTcpServerConsumer extends DefaultConsumer { protected void doShutdown() throws Exception { super.doShutdown(); consumerExecutor.shutdownNow(); - acceptExecutor.shutdownNow(); + if (acceptThread != null) { + acceptThread.interrupt(); + } + validationExecutor.shutdownNow(); } public MllpConfiguration getConfiguration() { @@ -192,8 +196,21 @@ public class MllpTcpServerConsumer extends DefaultConsumer { return consumerRunnables; } - public void startConsumer(Socket clientSocket) { - TcpSocketConsumerRunnable client = new TcpSocketConsumerRunnable(this, clientSocket); + public void validateConsumer(Socket clientSocket) { + MllpSocketBuffer mllpBuffer = new MllpSocketBuffer(getEndpoint()); + TcpServerConsumerValidationRunnable client = new TcpServerConsumerValidationRunnable(this, clientSocket, mllpBuffer); + + try { + log.info("Validating consumer for Socket {}", clientSocket); + validationExecutor.submit(client); + } catch (RejectedExecutionException rejectedExecutionEx) { + log.warn("Cannot validate consumer - max validations already active"); + mllpBuffer.resetSocket(clientSocket); + } + } + + public void startConsumer(Socket clientSocket, MllpSocketBuffer mllpBuffer) { + TcpSocketConsumerRunnable client = new TcpSocketConsumerRunnable(this, clientSocket, mllpBuffer); consumerRunnables.put(client, System.currentTimeMillis()); try { @@ -205,5 +222,15 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } } + + @Override + public void handleException(Throwable t) { + super.handleException(t); + } + + @Override + public void handleException(String message, Throwable t) { + super.handleException(message, t); + } } diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java index 8839045..faf6d63 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java @@ -170,8 +170,11 @@ public class MllpSocketBuffer { endOfBlockIndex = -1; } - public synchronized void readFrom(Socket socket) throws MllpSocketException, SocketTimeoutException { + readFrom(socket, endpoint.getConfiguration().getReceiveTimeout(), endpoint.getConfiguration().getReadTimeout()); + } + + public synchronized void readFrom(Socket socket, int receiveTimeout, int readTimeout) throws MllpSocketException, SocketTimeoutException { log.trace("Entering readFrom ..."); if (socket != null && socket.isConnected() && !socket.isClosed()) { ensureCapacity(MIN_BUFFER_SIZE); @@ -179,11 +182,11 @@ public class MllpSocketBuffer { try { InputStream socketInputStream = socket.getInputStream(); - socket.setSoTimeout(endpoint.getConfiguration().getReceiveTimeout()); + socket.setSoTimeout(receiveTimeout); readSocketInputStream(socketInputStream, socket); if (!hasCompleteEnvelope()) { - socket.setSoTimeout(endpoint.getConfiguration().getReadTimeout()); + socket.setSoTimeout(readTimeout); while (!hasCompleteEnvelope()) { ensureCapacity(Math.max(MIN_BUFFER_SIZE, socketInputStream.available())); @@ -249,7 +252,7 @@ public class MllpSocketBuffer { log.trace("Exiting writeTo ..."); } - public synchronized byte toByteArray()[] { + public synchronized byte[] toByteArray() { if (availableByteCount > 0) { return Arrays.copyOf(buffer, availableByteCount); } @@ -257,7 +260,7 @@ public class MllpSocketBuffer { return null; } - public synchronized byte toByteArrayAndReset()[] { + public synchronized byte[] toByteArrayAndReset() { byte[] answer = toByteArray(); reset(); @@ -293,6 +296,22 @@ public class MllpSocketBuffer { return ""; } + public synchronized String toStringAndReset() { + String answer = toString(); + + reset(); + + return answer; + } + + public synchronized String toStringAndReset(String charsetName) { + String answer = toString(charsetName); + + reset(); + + return answer; + } + /** * Convert the entire contents of the buffer (including enveloping characters) to a print-friendly * String representation. @@ -534,8 +553,10 @@ public class MllpSocketBuffer { } void updateIndexes(int b, int indexOffset) { - if (startOfBlockIndex < 0 && b == MllpProtocolConstants.START_OF_BLOCK) { - startOfBlockIndex = availableByteCount + indexOffset; + if (startOfBlockIndex < 0) { + if (b == MllpProtocolConstants.START_OF_BLOCK) { + startOfBlockIndex = availableByteCount + indexOffset; + } } else if (endOfBlockIndex < 0 && b == MllpProtocolConstants.END_OF_BLOCK) { endOfBlockIndex = availableByteCount + indexOffset; } diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptRunnable.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptThread.java similarity index 88% rename from components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptRunnable.java rename to components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptThread.java index 568a56a..f126286 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptRunnable.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptThread.java @@ -31,16 +31,16 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; /** - * Runnable to handle the ServerSocket.accept requests + * Thread to handle the ServerSocket.accept requests, and submit the sockets to the accept executor for validation. */ -public class TcpServerAcceptRunnable implements Runnable { +public class TcpServerAcceptThread extends Thread { Logger log = LoggerFactory.getLogger(this.getClass()); MllpTcpServerConsumer consumer; ServerSocket serverSocket; boolean running; - public TcpServerAcceptRunnable(MllpTcpServerConsumer consumer, ServerSocket serverSocket) { + public TcpServerAcceptThread(MllpTcpServerConsumer consumer, ServerSocket serverSocket) { this.consumer = consumer; this.serverSocket = serverSocket; } @@ -97,7 +97,6 @@ public class TcpServerAcceptRunnable implements Runnable { Socket socket = null; try { socket = serverSocket.accept(); - consumer.getEndpoint().updateLastConnectionEstablishedTicks(); } catch (SocketTimeoutException timeoutEx) { // Didn't get a new connection - keep waiting for one log.debug("Timeout waiting for client connection - keep listening"); @@ -126,7 +125,7 @@ public class TcpServerAcceptRunnable implements Runnable { if (MllpSocketBuffer.isConnectionValid(socket)) { // Try and avoid starting client threads for things like security scans and load balancer probes - consumer.startConsumer(socket); + consumer.validateConsumer(socket); } } } finally { @@ -144,7 +143,18 @@ public class TcpServerAcceptRunnable implements Runnable { } } - public void stop() { - running = false; + @Override + public void interrupt() { + this.running = false; + super.interrupt(); + if (null != serverSocket) { + if (serverSocket.isBound()) { + try { + serverSocket.close(); + } catch (IOException ioEx) { + log.warn("Exception encountered closing ServerSocket in interrupt() method - ignoring", ioEx); + } + } + } } } diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerConsumerValidationRunnable.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerConsumerValidationRunnable.java new file mode 100644 index 0000000..066b3db --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerConsumerValidationRunnable.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.mllp.internal; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.SocketTimeoutException; + +import org.apache.camel.Route; +import org.apache.camel.component.mllp.MllpSocketException; +import org.apache.camel.component.mllp.MllpTcpServerConsumer; +import org.apache.camel.impl.MDCUnitOfWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * Runnable to handle the ServerSocket.accept requests + */ +public class TcpServerConsumerValidationRunnable implements Runnable { + final Socket clientSocket; + final MllpSocketBuffer mllpBuffer; + + Logger log = LoggerFactory.getLogger(this.getClass()); + MllpTcpServerConsumer consumer; + + private final String localAddress; + private final String remoteAddress; + private final String combinedAddress; + + public TcpServerConsumerValidationRunnable(MllpTcpServerConsumer consumer, Socket clientSocket, MllpSocketBuffer mllpBuffer) { + this.consumer = consumer; + // this.setName(createThreadName(clientSocket)); + this.clientSocket = clientSocket; + + SocketAddress localSocketAddress = clientSocket.getLocalSocketAddress(); + if (localSocketAddress != null) { + localAddress = localSocketAddress.toString(); + } else { + localAddress = null; + } + + SocketAddress remoteSocketAddress = clientSocket.getRemoteSocketAddress(); + if (remoteSocketAddress != null) { + remoteAddress = remoteSocketAddress.toString(); + } else { + remoteAddress = null; + } + + combinedAddress = MllpSocketBuffer.formatAddressString(remoteSocketAddress, localSocketAddress); + + + try { + if (consumer.getConfiguration().hasKeepAlive()) { + this.clientSocket.setKeepAlive(consumer.getConfiguration().getKeepAlive()); + } + if (consumer.getConfiguration().hasTcpNoDelay()) { + this.clientSocket.setTcpNoDelay(consumer.getConfiguration().getTcpNoDelay()); + } + if (consumer.getConfiguration().hasReceiveBufferSize()) { + this.clientSocket.setReceiveBufferSize(consumer.getConfiguration().getReceiveBufferSize()); + } + if (consumer.getConfiguration().hasSendBufferSize()) { + this.clientSocket.setSendBufferSize(consumer.getConfiguration().getSendBufferSize()); + } + + this.clientSocket.setSoLinger(false, -1); + + // Initial Read Timeout + this.clientSocket.setSoTimeout(consumer.getConfiguration().getReceiveTimeout()); + } catch (IOException initializationException) { + throw new IllegalStateException("Failed to initialize " + this.getClass().getSimpleName(), initializationException); + } + + if (mllpBuffer == null) { + this.mllpBuffer = new MllpSocketBuffer(consumer.getEndpoint()); + } else { + this.mllpBuffer = mllpBuffer; + } + } + + /** + * derive a thread name from the class name, the component URI and the connection information + * <p/> + * The String will in the format <class name>[endpoint key] - [local socket address] -> [remote socket address] + * + * @return the thread name + */ + String createThreadName(Socket socket) { + // Get the URI without options + String fullEndpointKey = consumer.getEndpoint().getEndpointKey(); + String endpointKey; + if (fullEndpointKey.contains("?")) { + endpointKey = fullEndpointKey.substring(0, fullEndpointKey.indexOf('?')); + } else { + endpointKey = fullEndpointKey; + } + + // Now put it all together + return String.format("%s[%s] - %s", this.getClass().getSimpleName(), endpointKey, combinedAddress); + } + + /** + * Do the initial read on the Socket and try to determine if it has HL7 data, junk, or nothing. + */ + @Override + public void run() { + String originalThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName(createThreadName(clientSocket)); + MDC.put(MDCUnitOfWork.MDC_CAMEL_CONTEXT_ID, consumer.getEndpoint().getCamelContext().getName()); + + Route route = consumer.getRoute(); + if (route != null) { + String routeId = route.getId(); + if (routeId != null) { + MDC.put(MDCUnitOfWork.MDC_ROUTE_ID, route.getId()); + } + } + + log.debug("Checking {} for data", combinedAddress); + + try { + mllpBuffer.readFrom(clientSocket, 500, 50); + if (mllpBuffer.hasCompleteEnvelope() || mllpBuffer.hasStartOfBlock()) { + consumer.startConsumer(clientSocket, mllpBuffer); + } else if (!mllpBuffer.isEmpty()) { + // We have some leading out-of-band data but no START_OF_BLOCK + log.info("Ignoring out-of-band data on initial read: {}", mllpBuffer.toStringAndReset()); + mllpBuffer.resetSocket(clientSocket); + } + } catch (MllpSocketException socketEx) { + // TODO: The socket is invalid for some reason + if (!mllpBuffer.isEmpty()) { + log.warn("Exception encountered receiving complete message: ", mllpBuffer.toStringAndReset()); + } + mllpBuffer.resetSocket(clientSocket); + } catch (SocketTimeoutException timeoutEx) { + if (mllpBuffer.isEmpty()) { + log.debug("Initial read timed-out but no data was read - starting consumer"); + consumer.startConsumer(clientSocket, mllpBuffer); + } else { + log.warn("Timeout receiving complete message: {}", mllpBuffer.toStringAndReset()); + mllpBuffer.resetSocket(clientSocket); + } + } finally { + Thread.currentThread().setName(originalThreadName); + } + } + + public void closeSocket() { + mllpBuffer.closeSocket(clientSocket); + } + + public void closeSocket(String logMessage) { + mllpBuffer.closeSocket(clientSocket, logMessage); + } + + public void resetSocket() { + mllpBuffer.resetSocket(clientSocket); + } + + public void resetSocket(String logMessage) { + mllpBuffer.resetSocket(clientSocket, logMessage); + } + + public String getLocalAddress() { + return localAddress; + } + + public String getRemoteAddress() { + return remoteAddress; + } + + public String getCombinedAddress() { + return combinedAddress; + } + +} diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java index 0f0aa56..8940867 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java @@ -61,7 +61,7 @@ public class TcpSocketConsumerRunnable implements Runnable { private final String remoteAddress; private final String combinedAddress; - public TcpSocketConsumerRunnable(MllpTcpServerConsumer consumer, Socket clientSocket) { + public TcpSocketConsumerRunnable(MllpTcpServerConsumer consumer, Socket clientSocket, MllpSocketBuffer mllpBuffer) { this.consumer = consumer; // this.setName(createThreadName(clientSocket)); this.clientSocket = clientSocket; @@ -105,7 +105,11 @@ public class TcpSocketConsumerRunnable implements Runnable { throw new IllegalStateException("Failed to initialize " + this.getClass().getSimpleName(), initializationException); } - mllpBuffer = new MllpSocketBuffer(consumer.getEndpoint()); + if (mllpBuffer == null) { + this.mllpBuffer = new MllpSocketBuffer(consumer.getEndpoint()); + } else { + this.mllpBuffer = mllpBuffer; + } } /** @@ -448,6 +452,7 @@ public class TcpSocketConsumerRunnable implements Runnable { throw runtimeEx; } catch (Exception ex) { log.error("Unexpected exception processing exchange", ex); + exchange.setException(ex); } } catch (Exception uowEx) { // TODO: Handle this correctly @@ -483,8 +488,15 @@ public class TcpSocketConsumerRunnable implements Runnable { log.debug("Starting {} for {}", this.getClass().getSimpleName(), combinedAddress); try { + byte[] hl7MessageBytes = null; + if (mllpBuffer.hasCompleteEnvelope()) { + // If we got a complete message on the validation read, process it + hl7MessageBytes = mllpBuffer.toMllpPayload(); + mllpBuffer.reset(); + processMessage(hl7MessageBytes); + } + while (running && null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) { - byte[] hl7MessageBytes = null; log.debug("Checking for data ...."); try { mllpBuffer.readFrom(clientSocket); @@ -519,31 +531,16 @@ public class TcpSocketConsumerRunnable implements Runnable { consumer.getEndpoint().doConnectionClose(clientSocket, true, log); } } - log.info("No data received - ignoring timeout"); + log.debug("No data received - ignoring timeout"); } else { mllpBuffer.resetSocket(clientSocket); - if (consumer.getEndpoint().isBridgeErrorHandler()) { - Exchange exchange = consumer.getEndpoint().createExchange(ExchangePattern.InOut); - exchange.setException(new MllpInvalidMessageException("Timeout receiving complete payload", mllpBuffer.toByteArray())); - log.warn("Exception encountered reading payload - sending exception to route", exchange.getException()); - try { - consumer.getProcessor().process(exchange); - } catch (Exception e) { - log.error("Exception encountered processing exchange with exception encounter reading payload", e); - } - } else { - log.error("Timeout receiving complete payload", new MllpInvalidMessageException("Timeout receiving complete payload", mllpBuffer.toByteArray(), timeoutEx)); - } + new MllpInvalidMessageException("Timeout receiving complete message payload", mllpBuffer.toByteArrayAndReset(), timeoutEx); + consumer.handleException(new MllpInvalidMessageException("Timeout receiving complete message payload", mllpBuffer.toByteArrayAndReset(), timeoutEx)); } } catch (MllpSocketException mllpSocketEx) { + mllpBuffer.resetSocket(clientSocket); if (!mllpBuffer.isEmpty()) { - Exchange exchange = consumer.getEndpoint().createExchange(ExchangePattern.InOut); - exchange.setException(new MllpReceiveException("Exception encountered reading payload", mllpBuffer.toByteArrayAndReset(), mllpSocketEx)); - try { - consumer.getProcessor().process(exchange); - } catch (Exception ignoredEx) { - log.error("Ingnoring exception encountered processing exchange with exception encounter reading payload", ignoredEx); - } + consumer.handleException(new MllpReceiveException("Exception encountered reading payload", mllpBuffer.toByteArrayAndReset(), mllpSocketEx)); } else { log.warn("Ignoring exception encountered checking for data", mllpSocketEx); } @@ -558,6 +555,8 @@ public class TcpSocketConsumerRunnable implements Runnable { Thread.currentThread().setName(originalThreadName); MDC.remove(MDCUnitOfWork.MDC_ROUTE_ID); MDC.remove(MDCUnitOfWork.MDC_CAMEL_CONTEXT_ID); + + mllpBuffer.resetSocket(clientSocket); } } diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java index 9121136..0539e2e 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java @@ -42,7 +42,10 @@ import org.junit.Rule; import org.junit.Test; public class MllpTcpClientProducerIdleConnectionTimeoutTest extends CamelTestSupport { - static final int IDLE_TIMEOUT = 10000; + static final int CONNECT_TIMEOUT = 500; + static final int RECEIVE_TIMEOUT = 1000; + static final int READ_TIMEOUT = 500; + static final int IDLE_TIMEOUT = RECEIVE_TIMEOUT * 3; @Rule public MllpServerResource mllpServer = new MllpServerResource("localhost", AvailablePortFinder.getNextAvailable()); @@ -90,7 +93,8 @@ public class MllpTcpClientProducerIdleConnectionTimeoutTest extends CamelTestSup from(source.getDefaultEndpoint()).routeId(routeId) .log(LoggingLevel.INFO, routeId, "Sending Message") - .toF("mllp://%s:%d?idleTimeout=%s", mllpServer.getListenHost(), mllpServer.getListenPort(), IDLE_TIMEOUT) + .toF("mllp://%s:%d?connectTimeout=%d&receiveTimeout=%d&readTimeout=%d&idleTimeout=%s", mllpServer.getListenHost(), mllpServer.getListenPort(), + CONNECT_TIMEOUT, RECEIVE_TIMEOUT, READ_TIMEOUT, IDLE_TIMEOUT) .log(LoggingLevel.INFO, routeId, "Received Acknowledgement") .to(complete); } diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java index b7b4d99..c366465 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java @@ -40,7 +40,8 @@ import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.instanceOf; public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport { - static final int RECEIVE_TIMEOUT = 500; + static final int RECEIVE_TIMEOUT = 1000; + static final int READ_TIMEOUT = 500; @Rule public MllpClientResource mllpClient = new MllpClientResource(); @@ -68,7 +69,7 @@ public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport { String routeId = "mllp-receiver"; public void configure() { - fromF("mllp://%s:%d?autoAck=false", mllpClient.getMllpHost(), mllpClient.getMllpPort()) + fromF("mllp://%s:%d?receiveTimeout=%d&readTimeout=%d&autoAck=false", mllpClient.getMllpHost(), mllpClient.getMllpPort(), RECEIVE_TIMEOUT, READ_TIMEOUT) .log(LoggingLevel.INFO, routeId, "Receiving: ${body}") .to(result); } diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithValidationTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithValidationTest.java index 3645314..62140591 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithValidationTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithValidationTest.java @@ -17,6 +17,11 @@ package org.apache.camel.component.mllp; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.test.mllp.Hl7TestMessageGenerator; + public class MllpTcpServerConsumerOptionalEndOfDataWithValidationTest extends TcpServerConsumerEndOfDataAndValidationTestSupport { @Override @@ -62,7 +67,13 @@ public class MllpTcpServerConsumerOptionalEndOfDataWithValidationTest extends Tc public void testMessageContainingEmbeddedEndOfBlock() throws Exception { expectedInvalidCount = 1; - runMessageContainingEmbeddedEndOfBlock(); + setExpectedCounts(); + + NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create(); + + mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN", "EVN" + MllpProtocolConstants.END_OF_BLOCK)); + + assertTrue("Exchange should have completed", done.matches(5, TimeUnit.SECONDS)); } @Override diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java index 66087da..cc1c7d4 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java @@ -17,6 +17,9 @@ package org.apache.camel.component.mllp; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.NotifyBuilder; import org.apache.camel.component.mllp.internal.Hl7Util; import org.apache.camel.test.mllp.Hl7TestMessageGenerator; @@ -44,6 +47,8 @@ public class MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest extends @Override public void testNthInvalidMessage() throws Exception { + expectedFailedCount = 1; + runNthInvalidMessage(); } @@ -65,8 +70,13 @@ public class MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest extends public void testMessageContainingEmbeddedEndOfBlock() throws Exception { expectedCompleteCount = 1; - runMessageContainingEmbeddedEndOfBlock(); - } + setExpectedCounts(); + + NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create(); + + mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN", "EVN" + MllpProtocolConstants.END_OF_BLOCK)); + + assertTrue("Exchange should have completed", done.matches(5, TimeUnit.SECONDS)); } @Override public void testNthMessageContainingEmbeddedEndOfBlock() throws Exception { diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithValidationTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithValidationTest.java index d8bee66..bb7666f 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithValidationTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithValidationTest.java @@ -17,6 +17,11 @@ package org.apache.camel.component.mllp; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.test.mllp.Hl7TestMessageGenerator; + public class MllpTcpServerConsumerRequiredEndOfDataWithValidationTest extends TcpServerConsumerEndOfDataAndValidationTestSupport { @Override @@ -59,9 +64,15 @@ public class MllpTcpServerConsumerRequiredEndOfDataWithValidationTest extends Tc @Override public void testMessageContainingEmbeddedEndOfBlock() throws Exception { - expectedInvalidCount = 1; + //expectedInvalidCount = 1; + + setExpectedCounts(); + + NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create(); + + mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN", "EVN" + MllpProtocolConstants.END_OF_BLOCK)); - runMessageContainingEmbeddedEndOfBlock(); + assertFalse("Exchange should not have completed", done.matches(5, TimeUnit.SECONDS)); } @Override diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java index 458a7bf..4abe439 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java @@ -37,11 +37,15 @@ public class MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest extends @Override public void testInvalidMessage() throws Exception { + expectedFailedCount = 1; + runNthInvalidMessage(); } @Override public void testNthInvalidMessage() throws Exception { + expectedFailedCount = 1; + runNthInvalidMessage(); } @@ -61,9 +65,13 @@ public class MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest extends @Override public void testMessageContainingEmbeddedEndOfBlock() throws Exception { - expectedInvalidCount = 1; + setExpectedCounts(); + + NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create(); + + mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN", "EVN" + MllpProtocolConstants.END_OF_BLOCK)); - runMessageContainingEmbeddedEndOfBlock(); + assertFalse("Exchange should not have completed", done.matches(5, TimeUnit.SECONDS)); } @Override diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpClientProducerEndOfDataAndValidationTestSupport.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpClientProducerEndOfDataAndValidationTestSupport.java index f659685..4893f7e 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpClientProducerEndOfDataAndValidationTestSupport.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpClientProducerEndOfDataAndValidationTestSupport.java @@ -35,6 +35,9 @@ import org.junit.Rule; import org.junit.Test; public abstract class TcpClientProducerEndOfDataAndValidationTestSupport extends CamelTestSupport { + static final int RECEIVE_TIMEOUT = 1000; + static final int READ_TIMEOUT = 500; + static final String TEST_MESSAGE = "MSH|^~\\&|ADT|EPIC|JCAPS|CC|20161206193919|RISTECH|ADT^A08|00001|D|2.3^^|||||||" + '\r' + "EVN|A08|20150107161440||REG_UPDATE_SEND_VISIT_MESSAGES_ON_PATIENT_CHANGES|RISTECH^RADIOLOGY^TECHNOLOGIST^^^^^^UCLA^^^^^RRMC||" + '\r' @@ -158,7 +161,8 @@ public abstract class TcpClientProducerEndOfDataAndValidationTestSupport extends from(source.getDefaultEndpoint()).routeId(routeId) .log(LoggingLevel.INFO, routeId, "Sending Message") - .toF("mllp://%s:%d?validatePayload=%b&requireEndOfData=%b", mllpServer.getListenHost(), mllpServer.getListenPort(), validatePayload(), requireEndOfData()) + .toF("mllp://%s:%d?receiveTimeout=%d&readTimeout=%d&validatePayload=%b&requireEndOfData=%b", mllpServer.getListenHost(), mllpServer.getListenPort(), + RECEIVE_TIMEOUT, READ_TIMEOUT, validatePayload(), requireEndOfData()) .log(LoggingLevel.INFO, routeId, "Received Acknowledgement") .to(aa); } diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java index 094ff08..fe6e6f7 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java @@ -17,6 +17,9 @@ package org.apache.camel.component.mllp; +import static org.hamcrest.CoreMatchers.instanceOf; + +import java.net.SocketException; import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; @@ -30,7 +33,6 @@ import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit.rule.mllp.MllpClientResource; import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceException; -import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceTimeoutException; import org.apache.camel.test.junit4.CamelTestSupport; import org.apache.camel.test.mllp.Hl7TestMessageGenerator; import org.junit.Rule; @@ -38,7 +40,8 @@ import org.junit.Test; public abstract class TcpServerConsumerEndOfDataAndValidationTestSupport extends CamelTestSupport { static final int CONNECT_TIMEOUT = 500; - static final int RESPONSE_TIMEOUT = 5000; + static final int RECEIVE_TIMEOUT = 1000; + static final int READ_TIMEOUT = 500; @Rule public MllpClientResource mllpClient = new MllpClientResource(); @@ -46,10 +49,14 @@ public abstract class TcpServerConsumerEndOfDataAndValidationTestSupport extends @EndpointInject(uri = "mock://complete") MockEndpoint complete; + @EndpointInject(uri = "mock://failed") + MockEndpoint failed; + @EndpointInject(uri = "mock://invalid-ex") MockEndpoint invalid; int expectedCompleteCount; + int expectedFailedCount; int expectedInvalidCount; @Override @@ -76,8 +83,11 @@ public abstract class TcpServerConsumerEndOfDataAndValidationTestSupport extends onException(MllpInvalidMessageException.class) .to(invalid); - fromF("mllp://%s:%d?autoAck=true&connectTimeout=%d&receiveTimeout=%d&validatePayload=%b&requireEndOfData=%b", - mllpClient.getMllpHost(), mllpClient.getMllpPort(), CONNECT_TIMEOUT, RESPONSE_TIMEOUT, validatePayload(), requireEndOfData()) + onCompletion().onFailureOnly() + .to(failed); + + fromF("mllp://%s:%d?autoAck=true&connectTimeout=%d&receiveTimeout=%d&readTimeout=%d&validatePayload=%b&requireEndOfData=%b", + mllpClient.getMllpHost(), mllpClient.getMllpPort(), CONNECT_TIMEOUT, RECEIVE_TIMEOUT, READ_TIMEOUT, validatePayload(), requireEndOfData()) .routeId(routeId) .log(LoggingLevel.INFO, routeId, "Test route received message") .to(complete); @@ -91,6 +101,7 @@ public abstract class TcpServerConsumerEndOfDataAndValidationTestSupport extends protected void setExpectedCounts() { complete.expectedMessageCount(expectedCompleteCount); + failed.expectedMessageCount(expectedFailedCount); invalid.expectedMessageCount(expectedInvalidCount); } @@ -160,7 +171,7 @@ public abstract class TcpServerConsumerEndOfDataAndValidationTestSupport extends log.info("Sending TEST_MESSAGE_2"); String acknowledgement2 = mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(2)); - assertTrue("First two normal exchanges did not complete", notify1.matches(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS)); + assertTrue("First two normal exchanges did not complete", notify1.matches(RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS)); log.info("Sending TEST_MESSAGE_3"); mllpClient.setSendEndOfBlock(false); @@ -183,7 +194,7 @@ public abstract class TcpServerConsumerEndOfDataAndValidationTestSupport extends log.info("Sending TEST_MESSAGE_5"); String acknowledgement5 = mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(5)); - assertTrue("Remaining exchanges did not complete", notify2.matches(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS)); + assertTrue("Remaining exchanges did not complete", notify2.matches(RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS)); assertMockEndpointsSatisfied(10, TimeUnit.SECONDS); @@ -207,27 +218,47 @@ public abstract class TcpServerConsumerEndOfDataAndValidationTestSupport extends // Send one message to establish the connection and start the ConsumerClientSocketThread mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage()); - assertTrue("One exchange should have completed", oneDone.matches(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS)); + assertTrue("One exchange should have completed", oneDone.matches(RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS)); mllpClient.setSendEndOfBlock(false); mllpClient.setSendEndOfData(false); - mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage()); + mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage()); - assertTrue("Two exchanges should have completed", twoDone.matches(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS)); + assertTrue("Two exchanges should have completed", twoDone.matches(RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS)); } @Test public void testInitialMessageReadTimeout() throws Exception { - expectedInvalidCount = 1; + expectedCompleteCount = 1; setExpectedCounts(); mllpClient.setSendEndOfBlock(false); mllpClient.setSendEndOfData(false); - mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage()); + log.info("Sending first message"); + mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage(10001)); + + Thread.sleep(RECEIVE_TIMEOUT * 5); + + mllpClient.setSendEndOfBlock(true); + mllpClient.setSendEndOfData(true); + + try { + log.info("Attempting to send second message"); + String acknowledgement = mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(10002)); + assertEquals("If the send doesn't throw an exception, the acknowledgement should be empty", "", acknowledgement); + } catch (MllpJUnitResourceException expected) { + assertThat("If the send throws an exception, the cause should be a SocketException", expected.getCause(), instanceOf(SocketException.class)); + } + + mllpClient.disconnect(); + mllpClient.connect(); + + log.info("Sending third message"); + String acknowledgement = mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(10003)); } @Test @@ -306,16 +337,6 @@ public abstract class TcpServerConsumerEndOfDataAndValidationTestSupport extends @Test public abstract void testMessageContainingEmbeddedEndOfBlock() throws Exception; - protected void runMessageContainingEmbeddedEndOfBlock() throws Exception { - setExpectedCounts(); - - NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create(); - - mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN", "EVN" + MllpProtocolConstants.END_OF_BLOCK)); - - assertTrue("Exchange should have completed", done.matches(15, TimeUnit.SECONDS)); - } - @Test public abstract void testInvalidMessageContainingEmbeddedEndOfBlock() throws Exception; diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java index 092be01..cd87fc6 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java @@ -403,7 +403,7 @@ public class MllpSocketBufferTest extends SocketBufferTestSupport { assertFalse("Unexpected initial value", instance.hasEndOfBlock()); instance.write(MllpProtocolConstants.END_OF_BLOCK); - assertTrue(instance.hasEndOfBlock()); + assertFalse("START_OF_BLOCK before an END_OF_BLOCK", instance.hasEndOfBlock()); instance.reset(); assertFalse(instance.hasEndOfBlock()); @@ -457,7 +457,7 @@ public class MllpSocketBufferTest extends SocketBufferTestSupport { assertFalse(instance.hasEndOfData()); instance.write(MllpProtocolConstants.END_OF_DATA); - assertTrue(instance.hasEndOfData()); + assertFalse("Need a START_OF_BLOCK before the END_OF_DATA", instance.hasEndOfData()); instance.reset(); assertFalse(instance.hasEndOfData()); diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java index 1f21bbf..903669e 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java @@ -57,7 +57,7 @@ public class MllpSocketBufferWriteTest extends SocketBufferTestSupport { assertEquals(1, instance.size()); assertEquals(-1, instance.startOfBlockIndex); - assertEquals(0, instance.endOfBlockIndex); + assertEquals(-1, instance.endOfBlockIndex); } /** @@ -142,7 +142,7 @@ public class MllpSocketBufferWriteTest extends SocketBufferTestSupport { assertEquals(6, instance.size()); assertEquals(-1, instance.startOfBlockIndex); - assertEquals(4, instance.endOfBlockIndex); + assertEquals(-1, instance.endOfBlockIndex); } /** diff --git a/components/camel-mllp/src/test/resources/log4j2.properties b/components/camel-mllp/src/test/resources/log4j2.properties index 5c96df2..4179677 100644 --- a/components/camel-mllp/src/test/resources/log4j2.properties +++ b/components/camel-mllp/src/test/resources/log4j2.properties @@ -31,4 +31,4 @@ rootLogger.appenderRef.file.ref = file loggers = mllp logger.mllp.name = org.apache.camel.component.mllp -# logger.mllp.level = TRACE +logger.mllp.level = DEBUG -- To stop receiving notification emails like this one, please contact qu...@apache.org.