This is an automated email from the ASF dual-hosted git repository. quinn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new b3468ef CAMEL-12215 - add lenientBind URI option b3468ef is described below commit b3468ef4db9d17b137fc26268d83426d20890cf9 Author: Quinn Stevenson <qu...@apache.org> AuthorDate: Tue Jan 30 16:37:04 2018 -0700 CAMEL-12215 - add lenientBind URI option --- .../camel-mllp/src/main/docs/mllp-component.adoc | 15 +-- .../camel/component/mllp/MllpConfiguration.java | 23 +++- .../apache/camel/component/mllp/MllpEndpoint.java | 4 + .../component/mllp/MllpTcpServerConsumer.java | 103 ++++++---------- .../mllp/internal/TcpServerBindThread.java | 129 +++++++++++++++++++++ .../MllpTcpServerConsumerLenientBindTest.java | 99 ++++++++++++++++ .../mllp/MllpTcpServerConsumerConnectionTest.java | 30 ++--- ...rConsumerEndOfDataAndValidationTestSupport.java | 5 +- .../src/test/resources/log4j2.properties | 2 +- .../springboot/MllpComponentConfiguration.java | 18 +++ 10 files changed, 328 insertions(+), 100 deletions(-) diff --git a/components/camel-mllp/src/main/docs/mllp-component.adoc b/components/camel-mllp/src/main/docs/mllp-component.adoc index 2e1336a..972fa31 100644 --- a/components/camel-mllp/src/main/docs/mllp-component.adoc +++ b/components/camel-mllp/src/main/docs/mllp-component.adoc @@ -79,7 +79,7 @@ with the following path and query parameters: | *port* | *Required* Port number for the TCP connection | | int |=== -==== Query Parameters (26 parameters): +==== Query Parameters (27 parameters): [width="100%",cols="2,5,^1,2",options="header"] |=== @@ -95,18 +95,19 @@ with the following path and query parameters: | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean | *backlog* (tcp) | The maximum queue length for incoming connection indications (a request to connect) is set to the backlog parameter. If a connection indication arrives when the queue is full the connection is refused. | 5 | Integer +| *lenientBind* (tcp) | TCP Server Only - Allow the endpoint to start before the TCP ServerSocket is bound. In some environments it may be desirable to allow the endpoint to start before the TCP ServerSocket is bound. | false | boolean | *maxConcurrentConsumers* (tcp) | The maximum number of concurrent MLLP Consumer connections that will be allowed. If a new connection is received and the maximum is number are already established the new connection will be reset immediately. | 5 | int -| *reuseAddress* (tcp) | Enable/disable the SO_REUSEADDR socket option. | true | Boolean +| *receiveBufferSize* (tcp) | Sets the SO_RCVBUF option to the specified value (in bytes) | 8192 | Integer +| *reuseAddress* (tcp) | Enable/disable the SO_REUSEADDR socket option. | false | Boolean | *acceptTimeout* (timeout) | Timeout (in milliseconds) while waiting for a TCP connection TCP Server Only | 60000 | int | *bindRetryInterval* (timeout) | TCP Server Only - The number of milliseconds to wait between bind attempts | 5000 | int | *bindTimeout* (timeout) | TCP Server Only - The number of milliseconds to retry binding to a server port | 30000 | int -| *keepAlive* (tcp) | Enable/disable the SO_KEEPALIVE socket option. | true | Boolean +| *tcpNoDelay* (tcp) | Enable/disable the TCP_NODELAY socket option. | true | Boolean | *connectTimeout* (timeout) | Timeout (in milliseconds) for establishing for a TCP connection TCP Client only | 30000 | int -| *receiveBufferSize* (tcp) | Sets the SO_RCVBUF option to the specified value (in bytes) | 8192 | Integer -| *sendBufferSize* (tcp) | Sets the SO_SNDBUF option to the specified value (in bytes) | 8192 | Integer | *idleTimeout* (timeout) | The approximate idle time allowed before the Client TCP Connection will be reset. A null value or a value less than or equal to zero will disable the idle timeout. | | Integer | *maxReceiveTimeouts* (timeout) | *Deprecated* The maximum number of timeouts (specified by receiveTimeout) allowed before the TCP Connection will be reset. | | Integer -| *tcpNoDelay* (tcp) | Enable/disable the TCP_NODELAY socket option. | true | Boolean +| *keepAlive* (tcp) | Enable/disable the SO_KEEPALIVE socket option. | true | Boolean +| *sendBufferSize* (tcp) | Sets the SO_SNDBUF option to the specified value (in bytes) | 8192 | Integer | *readTimeout* (timeout) | The SO_TIMEOUT value (in milliseconds) used after the start of an MLLP frame has been received | 500 | int | *receiveTimeout* (timeout) | The SO_TIMEOUT value (in milliseconds) used when waiting for the start of an MLLP frame | 15000 | int | *charsetName* (codec) | Set the CamelCharsetName property on the exchange | | String @@ -195,4 +196,4 @@ The MLLP Producer adds these headers on the Camel message: |=================================== All headers are String types. If a header value is missing, its value -is null. \ No newline at end of file +is null. diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConfiguration.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConfiguration.java index 70a756b..4d5e46b 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConfiguration.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConfiguration.java @@ -40,6 +40,9 @@ public class MllpConfiguration implements Cloneable { @UriParam(label = "advanced,consumer,tcp,timeout", defaultValue = "5000") int bindRetryInterval = 5000; + @UriParam(label = "advanced,consumer,tcp", defaultValue = "false") + boolean lenientBind; + @UriParam(label = "advanced,consumer,tcp,timeout", defaultValue = "60000") int acceptTimeout = 60000; @@ -68,8 +71,8 @@ public class MllpConfiguration implements Cloneable { @UriParam(label = "advanced,producer,tcp", defaultValue = "true") Boolean tcpNoDelay = true; - @UriParam(label = "advanced,consumer,tcp", defaultValue = "true") - Boolean reuseAddress = true; + @UriParam(label = "advanced,consumer,tcp", defaultValue = "false") + Boolean reuseAddress = false; @UriParam(label = "advanced,tcp", defaultValue = "8192") Integer receiveBufferSize = 8192; @@ -206,6 +209,22 @@ public class MllpConfiguration implements Cloneable { return acceptTimeout; } + public boolean isLenientBind() { + return lenientBind; + } + + /** + * TCP Server Only - Allow the endpoint to start before the TCP ServerSocket is bound. + * + * In some environments, it may be desirable to allow the endpoint to start before the TCP ServerSocket + * is bound. + * + * @param lenientBind if true, the ServerSocket will be bound asynchronously; otherwise the ServerSocket will be bound synchronously. + */ + public void setLenientBind(boolean lenientBind) { + this.lenientBind = lenientBind; + } + /** * Timeout (in milliseconds) while waiting for a TCP connection * <p/> diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java index 5c0338a..6bffef5 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java @@ -278,6 +278,10 @@ public class MllpEndpoint extends DefaultEndpoint { configuration.setBindRetryInterval(bindRetryInterval); } + public void setLenientBind(boolean lenientBind) { + configuration.setLenientBind(lenientBind); + } + public void setAcceptTimeout(int acceptTimeout) { configuration.setAcceptTimeout(acceptTimeout); } diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java index 72b00bd..b0cdcfa 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java @@ -18,7 +18,6 @@ package org.apache.camel.component.mllp; import java.net.BindException; -import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.Date; @@ -37,8 +36,9 @@ import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.component.mllp.internal.MllpSocketBuffer; -import org.apache.camel.component.mllp.internal.TcpServerConsumerValidationRunnable; import org.apache.camel.component.mllp.internal.TcpServerAcceptThread; +import org.apache.camel.component.mllp.internal.TcpServerBindThread; +import org.apache.camel.component.mllp.internal.TcpServerConsumerValidationRunnable; import org.apache.camel.component.mllp.internal.TcpSocketConsumerRunnable; import org.apache.camel.impl.DefaultConsumer; @@ -49,7 +49,10 @@ import org.apache.camel.impl.DefaultConsumer; public class MllpTcpServerConsumer extends DefaultConsumer { final ExecutorService validationExecutor; final ExecutorService consumerExecutor; + + TcpServerBindThread bindThread; TcpServerAcceptThread acceptThread; + Map<TcpSocketConsumerRunnable, Long> consumerRunnables = new ConcurrentHashMap<>(); @@ -110,75 +113,51 @@ public class MllpTcpServerConsumer extends DefaultConsumer { consumerClientSocketThread.stop(); } - acceptThread.interrupt(); + if (acceptThread != null) { + acceptThread.interrupt(); + acceptThread = null; + } - acceptThread = null; + if (bindThread != null) { + bindThread.interrupt(); + bindThread = null; + } super.doStop(); } @Override protected void doStart() throws Exception { - log.debug("doStart() - starting acceptor"); - - ServerSocket serverSocket = new ServerSocket(); - if (getConfiguration().hasReceiveBufferSize()) { - serverSocket.setReceiveBufferSize(getConfiguration().getReceiveBufferSize()); - } - - if (getConfiguration().hasReuseAddress()) { - serverSocket.setReuseAddress(getConfiguration().getReuseAddress()); - } + if (bindThread == null || !bindThread.isAlive()) { + bindThread = new TcpServerBindThread(this); - // Accept Timeout - serverSocket.setSoTimeout(getConfiguration().getAcceptTimeout()); - - InetSocketAddress socketAddress; - if (null == getEndpoint().getHostname()) { - socketAddress = new InetSocketAddress(getEndpoint().getPort()); - } else { - socketAddress = new InetSocketAddress(getEndpoint().getHostname(), getEndpoint().getPort()); - } - long startTicks = System.currentTimeMillis(); - - // Log usage of deprecated URI options - if (getConfiguration().hasMaxReceiveTimeouts()) { - if (getConfiguration().hasIdleTimeout()) { - log.info("Both maxReceivedTimeouts {} and idleTimeout {} URI options are specified - idleTimeout will be used", - getConfiguration().getMaxReceiveTimeouts(), getConfiguration().getIdleTimeout()); + if (getConfiguration().isLenientBind()) { + log.debug("doStart() - starting bind thread"); + bindThread.start(); } else { - getConfiguration().setIdleTimeout(getConfiguration().getMaxReceiveTimeouts() * getConfiguration().getReceiveTimeout()); - log.info("Deprecated URI option maxReceivedTimeouts {} specified - idleTimeout {} will be used", getConfiguration().getMaxReceiveTimeouts(), getConfiguration().getIdleTimeout()); - } - } + log.debug("doStart() - attempting to bind to port {}", getEndpoint().getPort()); + bindThread.run(); - do { - try { - if (getConfiguration().hasBacklog()) { - serverSocket.bind(socketAddress, getConfiguration().getBacklog()); - } else { - serverSocket.bind(socketAddress); - } - } catch (BindException bindException) { - if (System.currentTimeMillis() > startTicks + getConfiguration().getBindTimeout()) { - log.error("Failed to bind to address {} within timeout {}", socketAddress, getConfiguration().getBindTimeout()); - throw bindException; - } else { - log.warn("Failed to bind to address {} - retrying in {} milliseconds", socketAddress, getConfiguration().getBindRetryInterval()); - Thread.sleep(getConfiguration().getBindRetryInterval()); + if (this.acceptThread == null) { + throw new BindException("Failed to bind to port " + getEndpoint().getPort()); } } - } while (!serverSocket.isBound()); - - // acceptRunnable = new TcpServerConsumerValidationRunnable(this, serverSocket); - // validationExecutor.submit(acceptRunnable); - acceptThread = new TcpServerAcceptThread(this, serverSocket); - acceptThread.start(); + } super.doStart(); } @Override + public void handleException(Throwable t) { + super.handleException(t); + } + + @Override + public void handleException(String message, Throwable t) { + super.handleException(message, t); + } + + @Override protected void doShutdown() throws Exception { super.doShutdown(); consumerExecutor.shutdownNow(); @@ -209,6 +188,11 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } } + public void startAcceptThread(ServerSocket serverSocket) { + acceptThread = new TcpServerAcceptThread(this, serverSocket); + acceptThread.start(); + } + public void startConsumer(Socket clientSocket, MllpSocketBuffer mllpBuffer) { TcpSocketConsumerRunnable client = new TcpSocketConsumerRunnable(this, clientSocket, mllpBuffer); @@ -222,15 +206,4 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } } - - @Override - public void handleException(Throwable t) { - super.handleException(t); - } - - @Override - public void handleException(String message, Throwable t) { - super.handleException(message, t); - } } - diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java new file mode 100644 index 0000000..893cf9e --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.mllp.internal; + +import java.io.IOException; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; + +import org.apache.camel.Route; +import org.apache.camel.component.mllp.MllpTcpServerConsumer; +import org.apache.camel.impl.MDCUnitOfWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * Runnable to handle the ServerSocket.accept requests + */ +public class TcpServerBindThread extends Thread { + private final Logger log = LoggerFactory.getLogger(this.getClass()); + private final MllpTcpServerConsumer consumer; + + public TcpServerBindThread(MllpTcpServerConsumer consumer) { + this.consumer = consumer; + + // Get the URI without options + String fullEndpointKey = consumer.getEndpoint().getEndpointKey(); + String endpointKey; + if (fullEndpointKey.contains("?")) { + endpointKey = fullEndpointKey.substring(0, fullEndpointKey.indexOf('?')); + } else { + endpointKey = fullEndpointKey; + } + + this.setName(String.format("%s - %s", this.getClass().getSimpleName(), endpointKey)); + } + + + /** + * Do the initial read on the Socket and try to determine if it has HL7 data, junk, or nothing. + */ + @Override + public void run() { + + MDC.put(MDCUnitOfWork.MDC_CAMEL_CONTEXT_ID, consumer.getEndpoint().getCamelContext().getName()); + + Route route = consumer.getRoute(); + if (route != null) { + String routeId = route.getId(); + if (routeId != null) { + MDC.put(MDCUnitOfWork.MDC_ROUTE_ID, route.getId()); + } + } + + try { + ServerSocket serverSocket = new ServerSocket(); + if (consumer.getConfiguration().hasReceiveBufferSize()) { + serverSocket.setReceiveBufferSize(consumer.getConfiguration().getReceiveBufferSize()); + } + + if (consumer.getConfiguration().hasReuseAddress()) { + serverSocket.setReuseAddress(consumer.getConfiguration().getReuseAddress()); + } + + // Accept Timeout + serverSocket.setSoTimeout(consumer.getConfiguration().getAcceptTimeout()); + + InetSocketAddress socketAddress; + if (null == consumer.getEndpoint().getHostname()) { + socketAddress = new InetSocketAddress(consumer.getEndpoint().getPort()); + } else { + socketAddress = new InetSocketAddress(consumer.getEndpoint().getHostname(), consumer.getEndpoint().getPort()); + } + + log.debug("Attempting to bind to {}", socketAddress); + + long startTicks = System.currentTimeMillis(); + do { + try { + if (consumer.getConfiguration().hasBacklog()) { + serverSocket.bind(socketAddress, consumer.getConfiguration().getBacklog()); + } else { + serverSocket.bind(socketAddress); + } + consumer.startAcceptThread(serverSocket); + } catch (BindException bindException) { + if (System.currentTimeMillis() > startTicks + consumer.getConfiguration().getBindTimeout()) { + log.error("Failed to bind to address {} within timeout {}", socketAddress, consumer.getConfiguration().getBindTimeout(), bindException); + break; + } else { + log.warn("Failed to bind to address {} - retrying in {} milliseconds", socketAddress, consumer.getConfiguration().getBindRetryInterval()); + try { + Thread.sleep(consumer.getConfiguration().getBindRetryInterval()); + } catch (InterruptedException interruptedEx) { + log.info("Bind to address {} interrupted", socketAddress, interruptedEx); + if (!this.isInterrupted()) { + super.interrupt(); + } + break; + } + } + } catch (IOException unexpectedEx) { + log.error("Unexpected exception encountered binding to address {}", socketAddress, unexpectedEx); + break; + } + } while (!this.isInterrupted() && !serverSocket.isBound()); + + } catch (IOException ioEx) { + log.error("Unexpected exception encountered initializing ServerSocket before attempting to bind", ioEx); + } + } + +} diff --git a/components/camel-mllp/src/test/java/org/apache/camel/MllpTcpServerConsumerLenientBindTest.java b/components/camel-mllp/src/test/java/org/apache/camel/MllpTcpServerConsumerLenientBindTest.java new file mode 100644 index 0000000..d0a681a --- /dev/null +++ b/components/camel-mllp/src/test/java/org/apache/camel/MllpTcpServerConsumerLenientBindTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel; + +import java.net.ServerSocket; +import java.net.SocketTimeoutException; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit.rule.mllp.MllpClientResource; +import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceException; +import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceTimeoutException; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.camel.test.mllp.Hl7TestMessageGenerator; +import org.junit.Rule; +import org.junit.Test; + +public class MllpTcpServerConsumerLenientBindTest extends CamelTestSupport { + static final int RECEIVE_TIMEOUT = 1000; + static final int READ_TIMEOUT = 500; + + @Rule + public MllpClientResource mllpClient = new MllpClientResource(); + + @EndpointInject(uri = "mock://result") + MockEndpoint result; + + ServerSocket portBlocker; + + @Override + protected void doPreSetup() throws Exception { + mllpClient.setMllpHost("localhost"); + mllpClient.setMllpPort(AvailablePortFinder.getNextAvailable()); + + portBlocker = new ServerSocket(mllpClient.getMllpPort()); + + assertTrue(portBlocker.isBound()); + + super.doPreSetup(); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + RouteBuilder builder = new RouteBuilder() { + String routeId = "mllp-receiver-with-lenient-bind"; + + public void configure() { + fromF("mllp://%s:%d?bindTimeout=15000&bindRetryInterval=500&receiveTimeout=%d&readTimeout=%d&reuseAddress=false&lenientBind=true", + mllpClient.getMllpHost(), mllpClient.getMllpPort(), RECEIVE_TIMEOUT, READ_TIMEOUT) + .routeId(routeId) + .log(LoggingLevel.INFO, routeId, "Receiving: ${body}") + .to(result); + } + }; + + return builder; + } + + @Test + public void testLenientBind() throws Exception { + assertEquals(ServiceStatus.Started, context.getStatus()); + + mllpClient.connect(); + try { + mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(10001)); + } catch (MllpJUnitResourceTimeoutException expectedEx) { + assertIsInstanceOf(SocketTimeoutException.class, expectedEx.getCause()); + } + mllpClient.reset(); + + portBlocker.close(); + Thread.sleep(2000); + assertEquals(ServiceStatus.Started, context.getStatus()); + + mllpClient.connect(); + String acknowledgement = mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(10002)); + assertStringContains(acknowledgement, "10002"); + } + + + +} diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java index c366465..0a07c1b 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java @@ -17,8 +17,6 @@ package org.apache.camel.component.mllp; -import java.net.SocketException; -import java.net.SocketTimeoutException; import java.util.concurrent.TimeUnit; import org.apache.camel.EndpointInject; @@ -37,7 +35,6 @@ import org.junit.Rule; import org.junit.Test; import static org.hamcrest.CoreMatchers.anyOf; -import static org.hamcrest.CoreMatchers.instanceOf; public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport { static final int RECEIVE_TIMEOUT = 1000; @@ -63,20 +60,6 @@ public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport { super.doPreSetup(); } - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - String routeId = "mllp-receiver"; - - public void configure() { - fromF("mllp://%s:%d?receiveTimeout=%d&readTimeout=%d&autoAck=false", mllpClient.getMllpHost(), mllpClient.getMllpPort(), RECEIVE_TIMEOUT, READ_TIMEOUT) - .log(LoggingLevel.INFO, routeId, "Receiving: ${body}") - .to(result); - } - }; - - } - /** * Simulate a Load Balancer Probe * <p/> @@ -97,7 +80,7 @@ public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport { result.setExpectedCount(0); result.setAssertPeriod(1000); - addTestRoute(-1); + addTestRouteWithIdleTimeout(-1); for (int i = 1; i <= connectionCount; ++i) { mllpClient.connect(); @@ -121,7 +104,7 @@ public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport { result.setExpectedCount(0); result.setAssertPeriod(1000); - addTestRoute(-1); + addTestRouteWithIdleTimeout(-1); for (int i = 1; i <= connectionCount; ++i) { mllpClient.connect(); @@ -150,7 +133,7 @@ public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport { result.setExpectedCount(1); result.setAssertPeriod(1000); - addTestRoute(idleTimeout); + addTestRouteWithIdleTimeout(idleTimeout); mllpClient.connect(); mllpClient.sendMessageAndWaitForAcknowledgement(testMessage); @@ -167,12 +150,13 @@ public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport { assertMockEndpointsSatisfied(15, TimeUnit.SECONDS); } - void addTestRoute(final int idleTimeout) throws Exception { + void addTestRouteWithIdleTimeout(final int idleTimeout) throws Exception { RouteBuilder builder = new RouteBuilder() { - String routeId = "mllp-receiver"; + String routeId = "mllp-receiver-with-timeout"; public void configure() { - fromF("mllp://%s:%d?receiveTimeout=%d&idleTimeout=%d", mllpClient.getMllpHost(), mllpClient.getMllpPort(), RECEIVE_TIMEOUT, idleTimeout) + fromF("mllp://%s:%d?receiveTimeout=%d&readTimeout=%d&idleTimeout=%d", mllpClient.getMllpHost(), mllpClient.getMllpPort(), RECEIVE_TIMEOUT, READ_TIMEOUT, idleTimeout) + .routeId(routeId) .log(LoggingLevel.INFO, routeId, "Receiving: ${body}") .to(result); } diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java index fe6e6f7..ed2a56e 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java @@ -17,8 +17,6 @@ package org.apache.camel.component.mllp; -import static org.hamcrest.CoreMatchers.instanceOf; - import java.net.SocketException; import java.util.concurrent.TimeUnit; @@ -35,9 +33,12 @@ import org.apache.camel.test.junit.rule.mllp.MllpClientResource; import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceException; import org.apache.camel.test.junit4.CamelTestSupport; import org.apache.camel.test.mllp.Hl7TestMessageGenerator; + import org.junit.Rule; import org.junit.Test; +import static org.hamcrest.CoreMatchers.instanceOf; + public abstract class TcpServerConsumerEndOfDataAndValidationTestSupport extends CamelTestSupport { static final int CONNECT_TIMEOUT = 500; static final int RECEIVE_TIMEOUT = 1000; diff --git a/components/camel-mllp/src/test/resources/log4j2.properties b/components/camel-mllp/src/test/resources/log4j2.properties index 4179677..7e3ad87 100644 --- a/components/camel-mllp/src/test/resources/log4j2.properties +++ b/components/camel-mllp/src/test/resources/log4j2.properties @@ -31,4 +31,4 @@ rootLogger.appenderRef.file.ref = file loggers = mllp logger.mllp.name = org.apache.camel.component.mllp -logger.mllp.level = DEBUG +# logger.mllp.level = DEBUG diff --git a/platforms/spring-boot/components-starter/camel-mllp-starter/src/main/java/org/apache/camel/component/mllp/springboot/MllpComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-mllp-starter/src/main/java/org/apache/camel/component/mllp/springboot/MllpComponentConfiguration.java index aeff923..0ff738a 100644 --- a/platforms/spring-boot/components-starter/camel-mllp-starter/src/main/java/org/apache/camel/component/mllp/springboot/MllpComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-mllp-starter/src/main/java/org/apache/camel/component/mllp/springboot/MllpComponentConfiguration.java @@ -122,6 +122,16 @@ public class MllpComponentConfiguration */ private Integer acceptTimeout = 60000; /** + * TCP Server Only - Allow the endpoint to start before the TCP + * ServerSocket is bound. In some environments, it may be desirable to + * allow the endpoint to start before the TCP ServerSocket is bound. + * + * @param lenientBind + * if true, the ServerSocket will be bound asynchronously; + * otherwise the ServerSocket will be bound synchronously. + */ + private Boolean lenientBind = false; + /** * Timeout (in milliseconds) for establishing for a TCP connection * <p/> * TCP Client only @@ -315,6 +325,14 @@ public class MllpComponentConfiguration this.acceptTimeout = acceptTimeout; } + public Boolean getLenientBind() { + return lenientBind; + } + + public void setLenientBind(Boolean lenientBind) { + this.lenientBind = lenientBind; + } + public Integer getConnectTimeout() { return connectTimeout; } -- To stop receiving notification emails like this one, please contact qu...@apache.org.