This is an automated email from the ASF dual-hosted git repository. quinn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 89bcfa0 CAMEL-12333 - Fix MllpTcpConsumer to only reset connections when they are idle for the specified timeout 89bcfa0 is described below commit 89bcfa0753233f8b576154486a89a2677d9d3efd Author: Quinn Stevenson <qu...@apache.org> AuthorDate: Wed Mar 7 14:49:16 2018 -0700 CAMEL-12333 - Fix MllpTcpConsumer to only reset connections when they are idle for the specified timeout --- .../camel-mllp/src/main/docs/mllp-component.adoc | 1 + .../org/apache/camel/component/mllp/MllpEndpoint.java | 6 +++++- .../camel/component/mllp/MllpTcpServerConsumer.java | 18 ++++++++++++------ 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/components/camel-mllp/src/main/docs/mllp-component.adoc b/components/camel-mllp/src/main/docs/mllp-component.adoc index 47255ae..fd82a56 100644 --- a/components/camel-mllp/src/main/docs/mllp-component.adoc +++ b/components/camel-mllp/src/main/docs/mllp-component.adoc @@ -172,6 +172,7 @@ by these properties on the Camel exchange: |*Key* |*Type* |*Description* |CamelMllpAcknowledgement | byte[] | If present, this property will we sent to client as the MLLP Acknowledgement |CamelMllpAcknowledgementString | String | If present and CamelMllpAcknowledgement is not present, this property will we sent to client as the MLLP Acknowledgement +|CamelMllpAcknowledgementMsaText | String | If neither CamelMllpAcknowledgement or CamelMllpAcknowledgementString are present and autoAck is true, this property can be used to specify the the contents of MSA-3 in the generated HL7 acknowledgement |CamelMllpAcknowledgementType | String | If neither CamelMllpAcknowledgement or CamelMllpAcknowledgementString are present and autoAck is true, this property can be used to specify the HL7 acknowledgement type (i.e. AA, AE, AR) |CamelMllpAutoAcknowledge | Boolean | Overrides the autoAck query parameter diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java index 803a188..ae8975b 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java @@ -176,7 +176,11 @@ public class MllpEndpoint extends DefaultEndpoint { } public void updateLastConnectionActivityTicks() { - lastConnectionActivityTicks = System.currentTimeMillis(); + updateLastConnectionActivityTicks(System.currentTimeMillis()); + } + + public void updateLastConnectionActivityTicks(long epochTicks) { + lastConnectionActivityTicks = epochTicks; } public void updateLastConnectionEstablishedTicks() { diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java index f2fb10a..4344dfb 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java @@ -171,11 +171,11 @@ public class MllpTcpServerConsumer extends DefaultConsumer { public void handleMessageTimeout(String message, byte[] payload, Throwable cause) { - super.handleException(new MllpInvalidMessageException(message, payload, cause)); + getExceptionHandler().handleException(new MllpInvalidMessageException(message, payload, cause)); } public void handleMessageException(String message, byte[] payload, Throwable cause) { - super.handleException(new MllpReceiveException(message, payload, cause)); + getExceptionHandler().handleException(new MllpReceiveException(message, payload, cause)); } public MllpConfiguration getConfiguration() { @@ -213,6 +213,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer { try { log.info("Starting consumer for Socket {}", clientSocket); consumerExecutor.submit(client); + getEndpoint().updateLastConnectionEstablishedTicks(); } catch (RejectedExecutionException rejectedExecutionEx) { log.warn("Cannot start consumer - max consumers already active"); mllpBuffer.resetSocket(clientSocket); @@ -220,7 +221,10 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } public void processMessage(byte[] hl7MessageBytes, TcpSocketConsumerRunnable consumerRunnable) { - getEndpoint().updateLastConnectionActivityTicks(); + long now = System.currentTimeMillis(); + + getEndpoint().updateLastConnectionActivityTicks(now); + consumerRunnables.put(consumerRunnable, now); // Send the message on to Camel for processing and wait for the response log.debug("Populating the exchange with received message"); @@ -268,7 +272,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer { getProcessor().process(exchange); sendAcknowledgement(hl7MessageBytes, exchange, consumerRunnable); } catch (Exception unexpectedEx) { - getExceptionHandler().handleException("Unexpected exception processing exchange", exchange, unexpectedEx); + String resetMessage = "Unexpected exception processing exchange"; + consumerRunnable.resetSocket(resetMessage); + getExceptionHandler().handleException(resetMessage, exchange, unexpectedEx); } } catch (Exception uowEx) { getExceptionHandler().handleException("Unexpected exception creating Unit of Work", exchange, uowEx); @@ -408,7 +414,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer { + MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING + "(type = " + acknowledgementBytesProperty.getClass().getSimpleName() + ") exchange properties can be converted to byte[]"; MllpInvalidAcknowledgementException invalidAckEx = new MllpInvalidAcknowledgementException(exceptionMessage, originalHl7MessageBytes, acknowledgementMessageBytes); exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, invalidAckEx); - handleException(invalidAckEx); + getExceptionHandler().handleException(invalidAckEx); } else { String acknowledgmentTypeProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, String.class); String msa3 = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_MSA_TEXT, String.class); @@ -451,7 +457,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } catch (MllpAcknowledgementGenerationException ackGenerationException) { exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, ackGenerationException); - handleException(ackGenerationException); + getExceptionHandler().handleException(ackGenerationException); } } } else { -- To stop receiving notification emails like this one, please contact qu...@apache.org.