CAMEL-10242 - added support for connection timeout
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4b545ae7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4b545ae7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4b545ae7 Branch: refs/heads/master Commit: 4b545ae7fd407f0df755e6ea40d9657f705b8ad0 Parents: 9a5778b Author: Quinn Stevenson <qu...@pronoia-solutions.com> Authored: Tue Sep 6 12:11:05 2016 -0600 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Sep 6 21:22:33 2016 +0200 ---------------------------------------------------------------------- .../src/main/docs/mllp-component.adoc | 5 +- .../camel/component/mllp/MllpEndpoint.java | 49 ++++++ .../component/mllp/MllpTcpClientProducer.java | 4 +- .../component/mllp/MllpTcpServerConsumer.java | 31 +++- .../camel/component/mllp/impl/MllpUtil.java | 13 +- .../MllpTcpServerConsumerConnectionTest.java | 104 ++++++++---- ...MllpTcpServerConsumerMessageHeadersTest.java | 158 +++++++++++++++++++ .../src/test/resources/log4j2.properties | 4 + 8 files changed, 327 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/main/docs/mllp-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/docs/mllp-component.adoc b/components/camel-mllp/src/main/docs/mllp-component.adoc index a96184c..2c776b1 100644 --- a/components/camel-mllp/src/main/docs/mllp-component.adoc +++ b/components/camel-mllp/src/main/docs/mllp-component.adoc @@ -55,7 +55,7 @@ The MLLP component has no options. // endpoint options: START -The MLLP component supports 19 endpoint options which are listed below: +The MLLP component supports 22 endpoint options which are listed below: {% raw %} [width="100%",cols="2,1,1m,1m,5",options="header"] @@ -64,6 +64,7 @@ The MLLP component supports 19 endpoint options which are listed below: | hostname | common | | String | *Required* Hostname or IP for connection for the TCP connection. The default value is null which means any local IP address | port | common | | int | *Required* Port number for the TCP connection | autoAck | common | true | boolean | Enable/Disable the automatic generation of a MLLP Acknowledgement MLLP Consumers only +| hl7Headers | common | true | boolean | Enable/Disable the automatic generation of message headers from the HL7 Message MLLP Consumers only | keepAlive | common | true | boolean | Enable/disable the SO_KEEPALIVE socket option. | reuseAddress | common | false | boolean | Enable/disable the SO_REUSEADDR socket option. | tcpNoDelay | common | true | boolean | Enable/disable the TCP_NODELAY socket option. @@ -79,6 +80,8 @@ The MLLP component supports 19 endpoint options which are listed below: | bindRetryInterval | timeout | 5000 | int | TCP Server Only - The number of milliseconds to wait between bind attempts | bindTimeout | timeout | 30000 | int | TCP Server Only - The number of milliseconds to retry binding to a server port | connectTimeout | timeout | 30000 | int | Timeout value for establishing for a TCP connection TCP Client only +| maxReceiveTimeouts | timeout | -1 | int | The maximum number of timeouts (specified by receiveTimeout) allowed before the TCP Connection will be reset. +| readTimeout | timeout | 500 | int | The SO_TIMEOUT value used after the start of an MLLP frame has been received | receiveTimeout | timeout | 10000 | int | The SO_TIMEOUT value used when waiting for the start of an MLLP frame |======================================================================= {% endraw %} http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java index 6a85b9c..7a3c195 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java @@ -72,6 +72,12 @@ public class MllpEndpoint extends DefaultEndpoint { @UriParam(label = "timeout", defaultValue = "10000") int receiveTimeout = 10000; + @UriParam(label = "timeout", defaultValue = "-1") + int maxReceiveTimeouts = -1; + + @UriParam(label = "timeout", defaultValue = "500") + int readTimeout = 500; + @UriParam(defaultValue = "true") boolean keepAlive = true; @@ -90,6 +96,9 @@ public class MllpEndpoint extends DefaultEndpoint { @UriParam(defaultValue = "true") boolean autoAck = true; + @UriParam(defaultValue = "true") + boolean hl7Headers = true; + @UriParam(label = "codec") String charsetName; @@ -261,6 +270,32 @@ public class MllpEndpoint extends DefaultEndpoint { this.receiveTimeout = receiveTimeout; } + public int getMaxReceiveTimeouts() { + return maxReceiveTimeouts; + } + + /** + * The maximum number of timeouts (specified by receiveTimeout) allowed before the TCP Connection will be reset. + * + * @param maxReceiveTimeouts maximum number of receiveTimeouts + */ + public void setMaxReceiveTimeouts(int maxReceiveTimeouts) { + this.maxReceiveTimeouts = maxReceiveTimeouts; + } + + public int getReadTimeout() { + return readTimeout; + } + + /** + * The SO_TIMEOUT value used after the start of an MLLP frame has been received + * + * @param readTimeout timeout in milliseconds + */ + public void setReadTimeout(int readTimeout) { + this.readTimeout = readTimeout; + } + public boolean isKeepAlive() { return keepAlive; } @@ -341,4 +376,18 @@ public class MllpEndpoint extends DefaultEndpoint { this.autoAck = autoAck; } + public boolean isHl7Headers() { + return hl7Headers; + } + + /** + * Enable/Disable the automatic generation of message headers from the HL7 Message + * + * MLLP Consumers only + * + * @param hl7Headers enabled if true, otherwise disabled + */ + public void setHl7Headers(boolean hl7Headers) { + this.hl7Headers = hl7Headers; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java index 2e74f6c..da04286 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java @@ -107,8 +107,8 @@ public class MllpTcpClientProducer extends DefaultProducer { log.debug("Reading acknowledgement from external system"); byte[] acknowledgementBytes = null; try { - if (MllpUtil.openFrame(socket)) { - acknowledgementBytes = MllpUtil.closeFrame(socket); + if (MllpUtil.openFrame(socket, endpoint.receiveTimeout, endpoint.readTimeout)) { + acknowledgementBytes = MllpUtil.closeFrame(socket, endpoint.receiveTimeout, endpoint.readTimeout); } } catch (SocketTimeoutException timeoutEx) { exchange.setException(new MllpAcknowledgementTimoutException("Acknowledgement timout", timeoutEx)); http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java index f1fa9b7..61f29db 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java @@ -325,6 +325,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } } + /** + * Nested Class read the Socket + */ class ClientSocketThread extends Thread { Socket clientSocket; Hl7AcknowledgementGenerator acknowledgementGenerator = new Hl7AcknowledgementGenerator(); @@ -346,7 +349,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer { this.clientSocket.setReuseAddress(endpoint.reuseAddress); this.clientSocket.setSoLinger(false, -1); - // Read Timeout + // Initial Read Timeout this.clientSocket.setSoTimeout(endpoint.receiveTimeout); } @@ -378,6 +381,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer { @Override public void run() { + int receiveTimeoutCounter = 0; while (!isInterrupted() && null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) { byte[] hl7MessageBytes = null; @@ -385,17 +389,25 @@ public class MllpTcpServerConsumer extends DefaultConsumer { log.debug("Reading data ...."); try { if (null != initialByte && START_OF_BLOCK == initialByte) { - hl7MessageBytes = MllpUtil.closeFrame(clientSocket); + hl7MessageBytes = MllpUtil.closeFrame(clientSocket, endpoint.receiveTimeout, endpoint.readTimeout); } else { try { - if (!MllpUtil.openFrame(clientSocket)) { + if (!MllpUtil.openFrame(clientSocket, endpoint.receiveTimeout, endpoint.readTimeout)) { + receiveTimeoutCounter = 0; continue; + } else { + receiveTimeoutCounter = 0; } } catch (SocketTimeoutException timeoutEx) { // When thrown by openFrame, it indicates that no data was available - but no error + if (endpoint.maxReceiveTimeouts > 0 && ++receiveTimeoutCounter >= endpoint.maxReceiveTimeouts) { + // TODO: Enhance logging?? + log.warn("Idle Client - resetting connection"); + MllpUtil.resetConnection(clientSocket); + } continue; } - hl7MessageBytes = MllpUtil.closeFrame(clientSocket); + hl7MessageBytes = MllpUtil.closeFrame(clientSocket, endpoint.receiveTimeout, endpoint.readTimeout); } } catch (MllpException mllpEx) { Exchange exchange = endpoint.createExchange(ExchangePattern.InOut); @@ -525,6 +537,8 @@ public class MllpTcpServerConsumer extends DefaultConsumer { MllpUtil.writeFramedPayload(clientSocket, acknowledgementMessageBytes); exchange.getIn().setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementMessageBytes); exchange.getIn().setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementMessageType); + exchange.setProperty(MLLP_ACKNOWLEDGEMENT, acknowledgementMessageBytes); + exchange.setProperty(MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementMessageType); // Check AFTER_SEND Properties if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) { @@ -540,8 +554,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } - log.info("ClientSocketThread exiting"); - + log.debug("ClientSocketThread exiting"); } private void populateHl7DataHeaders(Exchange exchange, Message message, byte[] hl7MessageBytes) { @@ -567,8 +580,8 @@ public class MllpTcpServerConsumer extends DefaultConsumer { if (-1 == endOfMSH) { // TODO: May want to throw some sort of an Exception here log.error("Population of message headers failed - unable to find the end of the MSH segment"); - } else { - log.debug("Populating the message headers"); + } else if (endpoint.hl7Headers) { + log.debug("Populating the HL7 message headers"); Charset charset = Charset.forName(IOHelper.getCharsetName(exchange)); for (int i = 2; i < fieldSeparatorIndexes.size(); ++i) { @@ -634,6 +647,8 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } } } + } else { + log.trace("HL7 Message headers disabled"); } } http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java index 2aab0a8..276d3ae 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java @@ -68,12 +68,13 @@ public final class MllpUtil { * @throws MllpCorruptFrameException if the MLLP Frame is corrupted in some way * @throws MllpException for other unexpected error conditions */ - public static boolean openFrame(Socket socket) throws SocketTimeoutException, MllpCorruptFrameException, MllpException { + public static boolean openFrame(Socket socket, int receiveTimeout, int readTimeout) throws SocketTimeoutException, MllpCorruptFrameException, MllpException { if (socket.isConnected() && !socket.isClosed()) { InputStream socketInputStream = MllpUtil.getInputStream(socket); int readByte = -1; try { + socket.setSoTimeout(receiveTimeout); readByte = socketInputStream.read(); switch (readByte) { case START_OF_BLOCK: @@ -110,6 +111,7 @@ public final class MllpUtil { outOfFrameData.write(readByte); try { + socket.setSoTimeout(readTimeout); while (true) { readByte = socketInputStream.read(); switch (readByte) { @@ -183,12 +185,13 @@ public final class MllpUtil { * @throws MllpCorruptFrameException if the MLLP Frame is corrupted in some way * @throws MllpException for other unexpected error conditions */ - public static byte[] closeFrame(Socket socket) throws MllpTimeoutException, MllpCorruptFrameException, MllpException { + public static byte[] closeFrame(Socket socket, int receiveTimeout, int readTimeout) throws MllpTimeoutException, MllpCorruptFrameException, MllpException { if (socket.isConnected() && !socket.isClosed()) { InputStream socketInputStream = MllpUtil.getInputStream(socket); // TODO: Come up with an intelligent way to size this stream ByteArrayOutputStream payload = new ByteArrayOutputStream(4096); try { + socket.setSoTimeout(readTimeout); while (true) { int readByte = socketInputStream.read(); switch (readByte) { @@ -226,6 +229,7 @@ public final class MllpUtil { throw new MllpCorruptFrameException("The MLLP frame was partially closed - END_OF_BLOCK was not followed by END_OF_DATA", payload.size() > 0 ? payload.toByteArray() : null); } + socket.setSoTimeout(receiveTimeout); return payload.toByteArray(); default: // log.trace( "Read Character: {}", (char)readByte ); @@ -263,6 +267,11 @@ public final class MllpUtil { } } + try { + socket.setSoTimeout(receiveTimeout); + } catch (SocketException e) { + // Eat this exception + } return null; } http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java index 99667b2..58b7d01 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java @@ -16,9 +16,7 @@ */ package org.apache.camel.component.mllp; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; +import java.net.SocketException; import java.util.concurrent.TimeUnit; import org.apache.camel.EndpointInject; @@ -26,25 +24,42 @@ import org.apache.camel.LoggingLevel; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit.rule.mllp.MllpClientResource; +import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceException; import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Rule; import org.junit.Test; public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport { - String mllpHost = "localhost"; - int mllpPort = AvailablePortFinder.getNextAvailable(); + static final int RECEIVE_TIMEOUT = 500; + + @Rule + public MllpClientResource mllpClient = new MllpClientResource(); + @EndpointInject(uri = "mock://result") MockEndpoint result; @Override - protected RouteBuilder createRouteBuilder() throws Exception { - mllpPort = AvailablePortFinder.getNextAvailable(); + protected void doPreSetup() throws Exception { + mllpClient.setMllpHost("localhost"); + mllpClient.setMllpPort(AvailablePortFinder.getNextAvailable()); + + super.doPreSetup(); + } + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { String routeId = "mllp-receiver"; public void configure() { - fromF("mllp://%s:%d?autoAck=false", mllpHost, mllpPort) + fromF("mllp://%s:%d?autoAck=false", mllpClient.getMllpHost(), mllpClient.getMllpPort()) .log(LoggingLevel.INFO, routeId, "Receiving: ${body}") .to(result); } @@ -69,33 +84,66 @@ public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport { */ @Test public void testConnectWithoutData() throws Exception { - result.setExpectedCount(0); int connectionCount = 10; + long connectionMillis = 200; + + result.setExpectedCount(0); + + addTestRoute(-1); + + for (int i = 1; i <= connectionCount; ++i) { + mllpClient.connect(); + Thread.sleep(connectionMillis); + mllpClient.close(); + } + + assertMockEndpointsSatisfied(15, TimeUnit.SECONDS); + } + + /** + * Simulate an Idle Client + * + * @throws Exception + */ + @Test + public void testIdleConnection() throws Exception { + final int maxReceiveTimeouts = 3; + String testMessage = "MSH|^~\\&|ADT|EPIC|JCAPS|CC|20160902123950|RISTECH|ADT^A08|00001|D|2.3|||||||" + '\r' + '\n'; + + result.setExpectedCount(1); + result.setAssertPeriod(1000); + + addTestRoute(maxReceiveTimeouts); + + mllpClient.connect(); + mllpClient.sendMessageAndWaitForAcknowledgement(testMessage); + Thread.sleep(RECEIVE_TIMEOUT * (maxReceiveTimeouts + 1)); - Socket dummyLoadBalancerSocket = null; - SocketAddress address = new InetSocketAddress(mllpHost, mllpPort); - int connectTimeout = 5000; try { - for (int i = 1; i <= connectionCount; ++i) { - log.debug("Creating connection #{}", i); - dummyLoadBalancerSocket = new Socket(); - dummyLoadBalancerSocket.connect(address, connectTimeout); - log.debug("Closing connection #{}", i); - dummyLoadBalancerSocket.close(); - Thread.sleep(1000); - } - } finally { - if (null != dummyLoadBalancerSocket) { - try { - dummyLoadBalancerSocket.close(); - } catch (Exception ex) { - log.warn("Exception encountered closing dummy load balancer socket", ex); - } - } + mllpClient.sendMessageAndWaitForAcknowledgement(testMessage); + fail("The MllpClientResource should have thrown an exception when writing to the reset socket"); + } catch (MllpJUnitResourceException ex) { + Throwable cause = ex.getCause(); + assertIsInstanceOf(SocketException.class, cause); + assertEquals("Broken pipe", cause.getMessage()); } assertMockEndpointsSatisfied(15, TimeUnit.SECONDS); } + void addTestRoute(int maxReceiveTimeouts) throws Exception { + RouteBuilder builder = new RouteBuilder() { + String routeId = "mllp-receiver"; + + public void configure() { + fromF("mllp://%s:%d?receiveTimeout=%d&maxReceiveTimeouts=%d", mllpClient.getMllpHost(), mllpClient.getMllpPort(), RECEIVE_TIMEOUT, maxReceiveTimeouts) + .log(LoggingLevel.INFO, routeId, "Receiving: ${body}") + .to(result); + } + }; + + context.addRoutes(builder); + context.start(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerMessageHeadersTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerMessageHeadersTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerMessageHeadersTest.java new file mode 100644 index 0000000..c0e54e4 --- /dev/null +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerMessageHeadersTest.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.EndpointInject; +import org.apache.camel.LoggingLevel; +import org.apache.camel.Message; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit.rule.mllp.MllpClientResource; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Rule; +import org.junit.Test; + +public class MllpTcpServerConsumerMessageHeadersTest extends CamelTestSupport { + @Rule + public MllpClientResource mllpClient = new MllpClientResource(); + + @EndpointInject(uri = "mock://result") + MockEndpoint result; + + @EndpointInject(uri = "mock://on-completion-result") + MockEndpoint onCompletionResult; + + @Override + protected CamelContext createCamelContext() throws Exception { + DefaultCamelContext context = (DefaultCamelContext) super.createCamelContext(); + + context.setUseMDCLogging(true); + context.setName(this.getClass().getSimpleName()); + + return context; + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Override + protected void doPreSetup() throws Exception { + mllpClient.setMllpHost("localhost"); + mllpClient.setMllpPort(AvailablePortFinder.getNextAvailable()); + + super.doPreSetup(); + } + + @Test + public void testHl7HeadersEnabled() throws Exception { + String testMessage = "MSH|^~\\&|ADT|EPIC|JCAPS|CC|20160902123950|RISTECH|ADT^A08|00001|D|2.3|||||||" + '\r' + '\n'; + + addTestRoute(true); + + result.expectedMessageCount(1); + + result.expectedHeaderReceived(MllpConstants.MLLP_SENDING_APPLICATION, "ADT"); + result.expectedHeaderReceived(MllpConstants.MLLP_SENDING_FACILITY, "EPIC"); + result.expectedHeaderReceived(MllpConstants.MLLP_RECEIVING_APPLICATION, "JCAPS"); + result.expectedHeaderReceived(MllpConstants.MLLP_TIMESTAMP, "20160902123950"); + result.expectedHeaderReceived(MllpConstants.MLLP_SECURITY, "RISTECH"); + result.expectedHeaderReceived(MllpConstants.MLLP_MESSAGE_TYPE, "ADT^A08"); + result.expectedHeaderReceived(MllpConstants.MLLP_EVENT_TYPE, "ADT"); + result.expectedHeaderReceived(MllpConstants.MLLP_TRIGGER_EVENT, "A08"); + result.expectedHeaderReceived(MllpConstants.MLLP_MESSAGE_CONTROL, "00001"); + result.expectedHeaderReceived(MllpConstants.MLLP_PROCESSING_ID, "D"); + result.expectedHeaderReceived(MllpConstants.MLLP_VERSION_ID, "2.3"); + + mllpClient.connect(); + + mllpClient.sendMessageAndWaitForAcknowledgement(testMessage, 10000); + + assertMockEndpointsSatisfied(10, TimeUnit.SECONDS); + + Message message = result.getExchanges().get(0).getIn(); + + assertNotNull("Should have header" + MllpConstants.MLLP_LOCAL_ADDRESS, message.getHeader(MllpConstants.MLLP_LOCAL_ADDRESS)); + assertNotNull("Should have header" + MllpConstants.MLLP_REMOTE_ADDRESS, message.getHeader(MllpConstants.MLLP_REMOTE_ADDRESS)); + } + + + @Test + public void testHl7HeadersDisabled() throws Exception { + String testMessage = "MSH|^~\\&|ADT|EPIC|JCAPS|CC|20160902123950|RISTECH|ADT^A08|00001|D|2.3|||||||" + '\r' + '\n'; + + addTestRoute(false); + + result.expectedMessageCount(1); + + mllpClient.connect(); + + mllpClient.sendMessageAndWaitForAcknowledgement(testMessage, 10000); + + assertMockEndpointsSatisfied(10, TimeUnit.SECONDS); + + Message message = result.getExchanges().get(0).getIn(); + + assertNotNull("Should have header" + MllpConstants.MLLP_LOCAL_ADDRESS, message.getHeader(MllpConstants.MLLP_LOCAL_ADDRESS)); + assertNotNull("Should have header" + MllpConstants.MLLP_REMOTE_ADDRESS, message.getHeader(MllpConstants.MLLP_REMOTE_ADDRESS)); + + assertNull("Should NOT have header" + MllpConstants.MLLP_SENDING_APPLICATION, message.getHeader(MllpConstants.MLLP_SENDING_APPLICATION)); + assertNull("Should NOT have header" + MllpConstants.MLLP_SENDING_FACILITY, message.getHeader(MllpConstants.MLLP_SENDING_FACILITY)); + assertNull("Should NOT have header" + MllpConstants.MLLP_RECEIVING_APPLICATION, message.getHeader(MllpConstants.MLLP_RECEIVING_APPLICATION)); + assertNull("Should NOT have header" + MllpConstants.MLLP_TIMESTAMP, message.getHeader(MllpConstants.MLLP_TIMESTAMP)); + assertNull("Should NOT have header" + MllpConstants.MLLP_SECURITY, message.getHeader(MllpConstants.MLLP_SECURITY)); + assertNull("Should NOT have header" + MllpConstants.MLLP_MESSAGE_TYPE, message.getHeader(MllpConstants.MLLP_MESSAGE_TYPE)); + assertNull("Should NOT have header" + MllpConstants.MLLP_EVENT_TYPE, message.getHeader(MllpConstants.MLLP_EVENT_TYPE)); + assertNull("Should NOT have header" + MllpConstants.MLLP_MESSAGE_CONTROL, message.getHeader(MllpConstants.MLLP_MESSAGE_CONTROL)); + assertNull("Should NOT have header" + MllpConstants.MLLP_PROCESSING_ID, message.getHeader(MllpConstants.MLLP_PROCESSING_ID)); + assertNull("Should NOT have header" + MllpConstants.MLLP_VERSION_ID, message.getHeader(MllpConstants.MLLP_VERSION_ID)); + } + + void addTestRoute(boolean hl7Headers) throws Exception { + RouteBuilder builder = new RouteBuilder() { + int connectTimeout = 500; + int responseTimeout = 5000; + + @Override + public void configure() throws Exception { + String routeId = "mllp-test-receiver-route"; + + onCompletion() + .to("mock://on-completion-result") + .toF("log:%s?level=INFO&showAll=true", routeId) + .log(LoggingLevel.INFO, routeId, "Test route complete"); + + fromF("mllp://%s:%d?autoAck=true&connectTimeout=%d&receiveTimeout=%d&hl7Headers=%b", + mllpClient.getMllpHost(), mllpClient.getMllpPort(), connectTimeout, responseTimeout, hl7Headers) + .routeId(routeId) + .log(LoggingLevel.INFO, routeId, "Test route received message") + .to(result); + + } + }; + context.addRoutes(builder); + context.start(); + } + +} + http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/resources/log4j2.properties b/components/camel-mllp/src/test/resources/log4j2.properties index e1d76ef..8c321d2 100644 --- a/components/camel-mllp/src/test/resources/log4j2.properties +++ b/components/camel-mllp/src/test/resources/log4j2.properties @@ -26,3 +26,7 @@ appender.out.layout.type = PatternLayout appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n rootLogger.level = INFO rootLogger.appenderRef.file.ref = file + +# loggers = mllp +# logger.mllp.name = org.apache.camel.component.mllp +# logger.mllp.level = DEBUG