Improved Socket cleanup and added support for bindTimeout URI paramter
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8f09e4b3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8f09e4b3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8f09e4b3 Branch: refs/heads/master Commit: 8f09e4b3110c70ec9896fbd051d816c294f914c4 Parents: d3bfca9 Author: Quinn Stevenson <qu...@pronoia-solutions.com> Authored: Mon Jan 25 15:48:53 2016 -0700 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Jan 28 09:11:37 2016 +0100 ---------------------------------------------------------------------- components/camel-mllp/pom.xml | 13 -- .../camel/component/mllp/MllpEndpoint.java | 35 ++++- .../component/mllp/MllpTcpClientProducer.java | 154 ++++++++++--------- .../component/mllp/MllpTcpServerConsumer.java | 50 +++++- .../camel/component/mllp/impl/MllpUtil.java | 22 ++- .../MllpTcpClientProducerBlueprintTest.java | 3 +- .../MllpTcpServerConsumerBindTimeoutTest.java | 117 ++++++++++++++ .../mllp/MllpTcpServerConsumerTest.java | 2 + .../junit/rule/mllp/MllpServerResource.java | 113 +++++++++++--- 9 files changed, 385 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-mllp/pom.xml b/components/camel-mllp/pom.xml index e49249b..c2cfa0c 100644 --- a/components/camel-mllp/pom.xml +++ b/components/camel-mllp/pom.xml @@ -73,17 +73,4 @@ </dependencies> - <!-- Ensure that test runs do no leave running JVMs --> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <shutdown>kill</shutdown> - </configuration> - </plugin> - </plugins> - </build> - </project> http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java index e93e408..3a274ef 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java @@ -47,8 +47,8 @@ public class MllpEndpoint extends DefaultEndpoint { private static final Logger LOG = LoggerFactory.getLogger(MllpEndpoint.class); - @UriPath(defaultValue = "0.0.0.0") - String hostname = "0.0.0.0"; + @UriPath(defaultValue = "null") + String hostname; @UriPath(description = "TCP Port for connection") int port = -1; @@ -56,9 +56,12 @@ public class MllpEndpoint extends DefaultEndpoint { @UriParam(defaultValue = "5") int backlog = 5; - @UriParam(defaultValue = "30000", description = "TCP Server only - timeout value while waiting for a TCP listener to start (milliseconds)") + @UriParam(defaultValue = "30000") int bindTimeout = 30000; + @UriParam(defaultValue = "5000") + int bindRetryInterval = 5000; + @UriParam(defaultValue = "60000") int acceptTimeout = 60000; @@ -160,7 +163,9 @@ public class MllpEndpoint extends DefaultEndpoint { } /** - * Hostname or IP for connection for the TCP connection + * Hostname or IP for connection for the TCP connection. + * + * The default value is null, which means any local IP address * * @param hostname Hostname or IP */ @@ -193,6 +198,28 @@ public class MllpEndpoint extends DefaultEndpoint { this.backlog = backlog; } + public int getBindTimeout() { + return bindTimeout; + } + + /** + * TCP Server Only - The number of milliseconds to retry binding to a server port + */ + public void setBindTimeout(int bindTimeout) { + this.bindTimeout = bindTimeout; + } + + public int getBindRetryInterval() { + return bindRetryInterval; + } + + /** + * TCP Server Only - The number of milliseconds to wait between bind attempts + */ + public void setBindRetryInterval(int bindRetryInterval) { + this.bindRetryInterval = bindRetryInterval; + } + public int getAcceptTimeout() { return acceptTimeout; } http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java index f95899f..17979c4 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java @@ -105,10 +105,11 @@ public class MllpTcpClientProducer extends DefaultProducer { } log.debug("Reading acknowledgement from external system"); - byte[] acknowledgementBytes; + byte[] acknowledgementBytes = null; try { - MllpUtil.openFrame(socket); - acknowledgementBytes = MllpUtil.closeFrame(socket); + if (MllpUtil.openFrame(socket)) { + acknowledgementBytes = MllpUtil.closeFrame(socket); + } } catch (SocketTimeoutException timeoutEx) { exchange.setException(new MllpAcknowledgementTimoutException("Acknowledgement timout", timeoutEx)); return; @@ -117,82 +118,84 @@ public class MllpTcpClientProducer extends DefaultProducer { return; } - log.debug("Populating the exchange with the acknowledgement from the external system"); - message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes); - - message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString()); - message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress()); - - // Now, extract the acknowledgement type and check for a NACK - byte fieldDelim = acknowledgementBytes[3]; - // First, find the beginning of the MSA segment - should be the second segment - int msaStartIndex = -1; - for (int i = 0; i < acknowledgementBytes.length; ++i) { - if (SEGMENT_DELIMITER == acknowledgementBytes[i]) { - final byte bM = 77; - final byte bS = 83; - final byte bC = 67; - final byte bA = 65; - final byte bE = 69; - final byte bR = 82; + if (null != acknowledgementBytes) { + log.debug("Populating the exchange with the acknowledgement from the external system"); + message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes); + + message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString()); + message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress()); + + // Now, extract the acknowledgement type and check for a NACK + byte fieldDelim = acknowledgementBytes[3]; + // First, find the beginning of the MSA segment - should be the second segment + int msaStartIndex = -1; + for (int i = 0; i < acknowledgementBytes.length; ++i) { + if (SEGMENT_DELIMITER == acknowledgementBytes[i]) { + final byte bM = 77; + final byte bS = 83; + final byte bC = 67; + final byte bA = 65; + final byte bE = 69; + final byte bR = 82; /* We've found the start of a new segment - make sure peeking ahead won't run off the end of the array - we need at least 7 more bytes */ - if (acknowledgementBytes.length > i + 7) { - // We can safely peek ahead - if (bM == acknowledgementBytes[i + 1] && bS == acknowledgementBytes[i + 2] && bA == acknowledgementBytes[i + 3] && fieldDelim == acknowledgementBytes[i + 4]) { - // Found the beginning of the MSA - the next two bytes should be our acknowledgement code - msaStartIndex = i + 1; - if (bA != acknowledgementBytes[i + 5] && bC != acknowledgementBytes[i + 5]) { - exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes))); - } else { - String acknowledgemenTypeString; - switch (acknowledgementBytes[i + 6]) { - case bA: - // We have an AA or CA- make sure that's the end of the field - if (fieldDelim != acknowledgementBytes[i + 7]) { - exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes))); - } - if (bA == acknowledgementBytes[i + 5]) { - message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA"); - } else { - message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CA"); - } - break; - case bE: - // We have an AE or CE - if (bA == acknowledgementBytes[i + 5]) { - message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE"); - exchange.setException(new MllpApplicationErrorAcknowledgementException(new String(acknowledgementBytes))); - } else { - message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CE"); - exchange.setException(new MllpCommitErrorAcknowledgementException(new String(acknowledgementBytes))); - } - break; - case bR: - // We have an AR or CR - if (bA == acknowledgementBytes[i + 5]) { - message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AR"); - exchange.setException(new MllpApplicationRejectAcknowledgementException(new String(acknowledgementBytes))); - } else { - message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CR"); - exchange.setException(new MllpCommitRejectAcknowledgementException(new String(acknowledgementBytes))); - } - break; - default: + if (acknowledgementBytes.length > i + 7) { + // We can safely peek ahead + if (bM == acknowledgementBytes[i + 1] && bS == acknowledgementBytes[i + 2] && bA == acknowledgementBytes[i + 3] && fieldDelim == acknowledgementBytes[i + 4]) { + // Found the beginning of the MSA - the next two bytes should be our acknowledgement code + msaStartIndex = i + 1; + if (bA != acknowledgementBytes[i + 5] && bC != acknowledgementBytes[i + 5]) { exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes))); + } else { + String acknowledgemenTypeString; + switch (acknowledgementBytes[i + 6]) { + case bA: + // We have an AA or CA- make sure that's the end of the field + if (fieldDelim != acknowledgementBytes[i + 7]) { + exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes))); + } + if (bA == acknowledgementBytes[i + 5]) { + message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA"); + } else { + message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CA"); + } + break; + case bE: + // We have an AE or CE + if (bA == acknowledgementBytes[i + 5]) { + message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE"); + exchange.setException(new MllpApplicationErrorAcknowledgementException(new String(acknowledgementBytes))); + } else { + message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CE"); + exchange.setException(new MllpCommitErrorAcknowledgementException(new String(acknowledgementBytes))); + } + break; + case bR: + // We have an AR or CR + if (bA == acknowledgementBytes[i + 5]) { + message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AR"); + exchange.setException(new MllpApplicationRejectAcknowledgementException(new String(acknowledgementBytes))); + } else { + message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CR"); + exchange.setException(new MllpCommitRejectAcknowledgementException(new String(acknowledgementBytes))); + } + break; + default: + exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes))); + } } - } - break; + break; + } } } - } - } - if (-1 == msaStartIndex) { - // Didn't find an MSA - exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes))); + } + if (-1 == msaStartIndex) { + // Didn't find an MSA + exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes))); + } } // Check AFTER_SEND Properties if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) { @@ -231,10 +234,15 @@ public class MllpTcpClientProducer extends DefaultProducer { } - SocketAddress address = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort()); - log.debug("Connecting to socket on {}", address); + InetSocketAddress socketAddress; + if (null == endpoint.getHostname()) { + socketAddress = new InetSocketAddress(endpoint.getPort()); + } else { + socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort()); + } + log.debug("Connecting to socket on {}", socketAddress); try { - socket.connect(address, endpoint.connectTimeout); + socket.connect(socketAddress, endpoint.connectTimeout); } catch (SocketTimeoutException e) { return e; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java index 05d13f5..e158dc3 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java @@ -18,6 +18,7 @@ package org.apache.camel.component.mllp; import java.io.IOException; import java.io.InputStream; +import java.net.BindException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; @@ -93,8 +94,27 @@ public class MllpTcpServerConsumer extends DefaultConsumer { // Accept Timeout serverSocket.setSoTimeout(endpoint.acceptTimeout); - InetSocketAddress socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort()); - serverSocket.bind(socketAddress, endpoint.backlog); + InetSocketAddress socketAddress; + if (null == endpoint.getHostname()) { + socketAddress = new InetSocketAddress(endpoint.getPort()); + } else { + socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort()); + } + long startTicks = System.currentTimeMillis(); + + do { + try { + serverSocket.bind(socketAddress, endpoint.backlog); + } catch (BindException bindException) { + if (System.currentTimeMillis() > startTicks + endpoint.getBindTimeout()) { + log.error( "Failed to bind to address {} within timeout {}", socketAddress, endpoint.getBindTimeout()); + throw bindException; + } else { + log.warn( "Failed to bind to address {} - retrying in {} milliseconds", socketAddress, endpoint.getBindRetryInterval()); + Thread.sleep(endpoint.getBindRetryInterval()); + } + } + } while ( !serverSocket.isBound() ); serverSocketThread = new ServerSocketThread(serverSocket); serverSocketThread.start(); @@ -106,6 +126,12 @@ public class MllpTcpServerConsumer extends DefaultConsumer { protected void doStop() throws Exception { log.debug("doStop()"); + // Close any client sockets that are currently open + for (ClientSocketThread clientSocketThread: clientThreads) { + clientSocketThread.interrupt(); + } + + switch (serverSocketThread.getState()) { case TERMINATED: // This is what we hope for @@ -195,7 +221,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer { log.debug("Starting acceptor thread"); try { - while (!isInterrupted() && null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) { + while (!isInterrupted() && null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) { // TODO: Need to check maxConnections and figure out what to do when exceeded Socket socket = null; try { @@ -370,7 +396,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer { @Override public void run() { - while (null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) { + while (!isInterrupted() && null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) { byte[] hl7MessageBytes = null; // Send the message on for processing and wait for the response log.debug("Reading data ...."); @@ -379,7 +405,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer { hl7MessageBytes = MllpUtil.closeFrame(clientSocket); } else { try { - MllpUtil.openFrame(clientSocket); + if (!MllpUtil.openFrame(clientSocket)) { + continue; + } } catch (SocketTimeoutException timeoutEx) { // When thrown by openFrame, it indicates that no data was available - but no error continue; @@ -605,6 +633,18 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } } } + + @Override + public void interrupt() { + if (null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) { + try { + clientSocket.close(); + } catch (IOException ex) { + log.warn("Exception encoutered closing client Socket in interrupt", ex); + } + } + super.interrupt(); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java index c1c5aec..72c2514 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; +import java.net.SocketException; import java.net.SocketTimeoutException; import org.apache.camel.component.mllp.MllpComponent; @@ -67,25 +68,34 @@ public final class MllpUtil { * @throws MllpCorruptFrameException if the MLLP Frame is corrupted in some way * @throws MllpException for other unexpected error conditions */ - public static void openFrame(Socket socket) throws SocketTimeoutException, MllpCorruptFrameException, MllpException { + public static boolean openFrame(Socket socket) throws SocketTimeoutException, MllpCorruptFrameException, MllpException { if (socket.isConnected() && !socket.isClosed()) { InputStream socketInputStream = MllpUtil.getInputStream(socket); - int readByte; + int readByte = -1; try { readByte = socketInputStream.read(); switch (readByte) { case START_OF_BLOCK: - return; + return true; case END_OF_STREAM: resetConnection(socket); - return; + return false; default: // Continue on and process the out-of-frame data } } catch (SocketTimeoutException normaTimeoutEx) { // Just pass this on - the caller will wrap it in a MllpTimeoutException throw normaTimeoutEx; + } catch (SocketException socketEx ) { + if (socket.isClosed()) { + LOG.debug( "Socket closed while opening MLLP frame - ignoring exception", socketEx); + return false; + } else { + LOG.error("Unexpected Exception occurred opening MLLP frame - resetting the connection"); + MllpUtil.resetConnection(socket); + throw new MllpException("Unexpected Exception occurred opening MLLP frame", socketEx); + } } catch (IOException unexpectedException) { LOG.error("Unexpected Exception occurred opening MLLP frame - resetting the connection"); MllpUtil.resetConnection(socket); @@ -152,6 +162,8 @@ public final class MllpUtil { throw new MllpCorruptFrameException("Exception encountered looking for the beginning of the MLLP frame, and out-of-frame data had been read", outOfFrameData.toByteArray()); } } + + return false; } /** @@ -319,7 +331,7 @@ public final class MllpUtil { } public static void resetConnection(Socket socket) { - if (null != socket) { + if (null != socket && !socket.isClosed()) { try { socket.setSoLinger(true, 0); } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java index 4233d80..bb87146 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java @@ -26,6 +26,7 @@ import org.apache.camel.ProducerTemplate; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.DefaultComponentResolver; import org.apache.camel.spi.ComponentResolver; +import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.blueprint.CamelBlueprintTestSupport; import org.apache.camel.test.junit.rule.mllp.MllpServerResource; import org.apache.camel.util.KeyValueHolder; @@ -36,7 +37,7 @@ import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage; public class MllpTcpClientProducerBlueprintTest extends CamelBlueprintTestSupport { @Rule - public MllpServerResource mllpServer = new MllpServerResource(); + public MllpServerResource mllpServer = new MllpServerResource(AvailablePortFinder.getNextAvailable()); final String sourceUri = "direct://source"; final String mockAcknowledgedUri = "mock://acknowledged"; http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerBindTimeoutTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerBindTimeoutTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerBindTimeoutTest.java new file mode 100644 index 0000000..3ea892f --- /dev/null +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerBindTimeoutTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +import java.net.ServerSocket; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.EndpointInject; +import org.apache.camel.LoggingLevel; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit.rule.mllp.MllpClientResource; +import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceException; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Rule; +import org.junit.Test; + +import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage; + +public class MllpTcpServerConsumerBindTimeoutTest extends CamelTestSupport { + @Rule + public MllpClientResource mllpClient = new MllpClientResource(); + + @EndpointInject(uri = "mock://result") + MockEndpoint result; + + @Override + protected CamelContext createCamelContext() throws Exception { + DefaultCamelContext context = (DefaultCamelContext) super.createCamelContext(); + + context.setUseMDCLogging(true); + context.setName(this.getClass().getSimpleName()); + + return context; + } + + @Override + public boolean isUseAdviceWith() { + return true; + } + + @Override + protected RouteBuilder createRouteBuilder() { + + mllpClient.setMllpHost("localhost"); + mllpClient.setMllpPort(AvailablePortFinder.getNextAvailable()); + + return new RouteBuilder() { + int connectTimeout = 500; + int responseTimeout = 5000; + + @Override + public void configure() throws Exception { + String routeId = "mllp-test-receiver-route"; + + onCompletion() + .toF("log:%s?level=INFO&showAll=true", routeId) + .log(LoggingLevel.INFO, routeId, "Test route complete"); + + fromF("mllp://%s:%d?autoAck=true&connectTimeout=%d&receiveTimeout=%d", + mllpClient.getMllpHost(), mllpClient.getMllpPort(), connectTimeout, responseTimeout) + .routeId(routeId) + .log(LoggingLevel.INFO, routeId, "Test route received message") + .to(result); + + } + }; + } + + @Test + public void testReceiveSingleMessage() throws Exception { + result.expectedMessageCount(1); + + Thread tmpThread = new Thread() { + public void run() { + try { + ServerSocket tmpSocket = new ServerSocket(mllpClient.getMllpPort()); + Thread.sleep(15000); + tmpSocket.close(); + } catch (Exception ex) { + throw new RuntimeException( "Exception caught in dummy listener", ex); + } + } + + }; + + tmpThread.start(); + + context.start(); + + mllpClient.connect(); + + mllpClient.sendMessageAndWaitForAcknowledgement(generateMessage(), 10000); + + assertMockEndpointsSatisfied(10, TimeUnit.SECONDS); + } + +} + http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java index c77c35f..eb6a463 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.mllp; +import java.net.BindException; +import java.net.ServerSocket; import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java index c148310..8215d1b 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java @@ -24,6 +24,8 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; +import java.util.LinkedList; +import java.util.List; import java.util.regex.Pattern; import org.junit.rules.ExternalResource; @@ -436,11 +438,11 @@ public class MllpServerResource extends ExternalResource { } void resetConnection(Socket socket) { - if (null != socket) { + if (null != socket && !socket.isClosed()) { try { socket.setSoLinger(true, 0); - } catch (Exception ex) { - log.warn("Exception encountered setting SO_LINGER to 0 on the socket to force a reset", ex); + } catch (SocketException socketEx) { + log.debug("SocketException encountered setting SO_LINGER to 0 on the socket to force a reset - ignoring", socketEx); } finally { closeConnection(socket); } @@ -455,6 +457,7 @@ public class MllpServerResource extends ExternalResource { Logger log = LoggerFactory.getLogger(this.getClass()); ServerSocket serverSocket; + List<ClientSocketThread> clientSocketThreads = new LinkedList<>(); String listenHost = "0.0.0.0"; int listenPort; @@ -519,30 +522,47 @@ public class MllpServerResource extends ExternalResource { public void run() { log.info("Accepting connections on port {}", serverSocket.getLocalPort()); this.setName("MllpServerResource$ServerSocketThread - " + serverSocket.getLocalSocketAddress().toString()); - while (isActive() && serverSocket.isBound()) { + while (!isInterrupted() && serverSocket.isBound() && !serverSocket.isClosed()) { Socket clientSocket = null; try { clientSocket = serverSocket.accept(); - clientSocket.setKeepAlive(true); - clientSocket.setTcpNoDelay(false); - clientSocket.setSoLinger(false, -1); - clientSocket.setSoTimeout(5000); - ClientSocketThread clientSocketThread = new ClientSocketThread(clientSocket); - clientSocketThread.setDaemon(true); - clientSocketThread.start(); } catch (SocketTimeoutException timeoutEx) { if (raiseExceptionOnAcceptTimeout) { throw new MllpJUnitResourceTimeoutException("Timeout Accepting client connection", timeoutEx); } - continue; - } catch (IOException e) { - log.warn("IOException creating Client Socket"); + log.warn("Timeout waiting for client connection"); + } catch (SocketException socketEx) { + log.debug("SocketException encountered accepting client connection - ignoring", socketEx); + if (null == clientSocket) { + continue; + } else if (!clientSocket.isClosed()) { + resetConnection(clientSocket); + continue; + } else { + throw new MllpJUnitResourceException("Unexpected SocketException encountered accepting client connection", socketEx); + } + } catch (Exception ex) { + throw new MllpJUnitResourceException("Unexpected exception encountered accepting client connection", ex); + } + if (null != clientSocket) { try { - clientSocket.close(); - } catch (IOException e1) { - log.warn("Exceptiong encountered closing client socket after attempting to accept connection"); + clientSocket.setKeepAlive(true); + clientSocket.setTcpNoDelay(false); + clientSocket.setSoLinger(false, -1); + clientSocket.setSoTimeout(5000); + ClientSocketThread clientSocketThread = new ClientSocketThread(clientSocket); + clientSocketThread.setDaemon(true); + clientSocketThread.start(); + clientSocketThreads.add(clientSocketThread); + } catch (Exception unexpectedEx) { + log.warn("Unexpected exception encountered configuring client socket"); + try { + clientSocket.close(); + } catch (IOException ingoreEx) { + log.warn("Exceptiong encountered closing client socket after attempting to accept connection", ingoreEx); + } + throw new MllpJUnitResourceException("Unexpected exception encountered configuring client socket", unexpectedEx); } - throw new MllpJUnitResourceException("IOException creating Socket", e); } } log.info("No longer accepting connections - closing TCP Listener on port {}", serverSocket.getLocalPort()); @@ -599,6 +619,22 @@ public class MllpServerResource extends ExternalResource { public void setRaiseExceptionOnAcceptTimeout(boolean raiseExceptionOnAcceptTimeout) { this.raiseExceptionOnAcceptTimeout = raiseExceptionOnAcceptTimeout; } + + @Override + public void interrupt() { + for (ClientSocketThread clientSocketThread: clientSocketThreads) { + clientSocketThread.interrupt(); + } + + if (serverSocket != null && serverSocket.isBound() && !serverSocket.isClosed()) { + try { + serverSocket.close(); + } catch (Exception ex) { + log.warn("Exception encountered closing server socket on interrupt", ex); + } + } + super.interrupt(); + } } /** @@ -638,8 +674,20 @@ public class MllpServerResource extends ExternalResource { log.info("Handling Connection: {} -> {}", localAddress, remoteAddress); try { - while (null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) { - InputStream instream = clientSocket.getInputStream(); + while (!isInterrupted() && null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) { + InputStream instream; + try { + instream = clientSocket.getInputStream(); + } catch (IOException ioEx) { + if (clientSocket.isClosed()) { + log.debug( "Client socket was closed - ignoring exception", clientSocket); + break; + } else { + throw new MllpJUnitResourceException( "Unexpected IOException encounted getting input stream", ioEx); + } + } catch (Exception unexpectedEx) { + throw new MllpJUnitResourceException( "Unexpected exception encounted getting input stream", unexpectedEx); + } String parsedHL7Message = getMessage(instream); if (null != parsedHL7Message && parsedHL7Message.length() > 0) { @@ -721,7 +769,7 @@ public class MllpServerResource extends ExternalResource { } } - log.info("Connection Finished: {} -> {}", localAddress, remoteAddress); + log.debug("Client Connection Finished: {} -> {}", localAddress, remoteAddress); } /** @@ -749,8 +797,15 @@ public class MllpServerResource extends ExternalResource { } } } catch (SocketException socketEx) { - log.error("Unable to read from socket stream when expected bMLLP_ENVELOPE_START_OF_BLOCK - resetting connection ", socketEx); - resetConnection(clientSocket); + if (clientSocket.isClosed()) { + log.info("Client socket closed while waiting for MLLP_ENVELOPE_START_OF_BLOCK"); + } else if ( clientSocket.isConnected() ) { + log.info( "SocketException encountered while waiting for MLLP_ENVELOPE_START_OF_BLOCK"); + resetConnection(clientSocket); + } else { + log.error("Unable to read from socket stream when expected bMLLP_ENVELOPE_START_OF_BLOCK - resetting connection ", socketEx); + resetConnection(clientSocket); + } return null; } @@ -872,6 +927,18 @@ public class MllpServerResource extends ExternalResource { return null; } + + @Override + public void interrupt() { + if (null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) { + try { + clientSocket.close(); + } catch (Exception ex) { + log.warn("Exception encountered closing client socket on interrput", ex); + } + } + super.interrupt(); + } }