This is an automated email from the ASF dual-hosted git repository.

quinn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 89bcfa0  CAMEL-12333 - Fix MllpTcpConsumer to only reset connections 
when they are idle for the specified timeout
89bcfa0 is described below

commit 89bcfa0753233f8b576154486a89a2677d9d3efd
Author: Quinn Stevenson <qu...@apache.org>
AuthorDate: Wed Mar 7 14:49:16 2018 -0700

    CAMEL-12333 - Fix MllpTcpConsumer to only reset connections when they are 
idle for the specified timeout
---
 .../camel-mllp/src/main/docs/mllp-component.adoc       |  1 +
 .../org/apache/camel/component/mllp/MllpEndpoint.java  |  6 +++++-
 .../camel/component/mllp/MllpTcpServerConsumer.java    | 18 ++++++++++++------
 3 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/components/camel-mllp/src/main/docs/mllp-component.adoc 
b/components/camel-mllp/src/main/docs/mllp-component.adoc
index 47255ae..fd82a56 100644
--- a/components/camel-mllp/src/main/docs/mllp-component.adoc
+++ b/components/camel-mllp/src/main/docs/mllp-component.adoc
@@ -172,6 +172,7 @@ by these properties on the Camel exchange:
 |*Key* |*Type* |*Description*
 |CamelMllpAcknowledgement | byte[] | If present, this property will we sent to 
client as the MLLP Acknowledgement
 |CamelMllpAcknowledgementString | String | If present and 
CamelMllpAcknowledgement is not present, this property will we sent to client 
as the MLLP Acknowledgement
+|CamelMllpAcknowledgementMsaText | String | If neither 
CamelMllpAcknowledgement or CamelMllpAcknowledgementString are present and 
autoAck is true, this property can be used to specify the the contents of MSA-3 
in the generated HL7 acknowledgement
 |CamelMllpAcknowledgementType | String  | If neither CamelMllpAcknowledgement 
or CamelMllpAcknowledgementString are present and autoAck is true, this 
property can be used to specify the HL7 acknowledgement type (i.e. AA, AE, AR)
 |CamelMllpAutoAcknowledge | Boolean | Overrides the autoAck query parameter
 
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index 803a188..ae8975b 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -176,7 +176,11 @@ public class MllpEndpoint extends DefaultEndpoint {
     }
 
     public void updateLastConnectionActivityTicks() {
-        lastConnectionActivityTicks = System.currentTimeMillis();
+        updateLastConnectionActivityTicks(System.currentTimeMillis());
+    }
+
+    public void updateLastConnectionActivityTicks(long epochTicks) {
+        lastConnectionActivityTicks = epochTicks;
     }
 
     public void updateLastConnectionEstablishedTicks() {
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index f2fb10a..4344dfb 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -171,11 +171,11 @@ public class MllpTcpServerConsumer extends 
DefaultConsumer {
 
 
     public void handleMessageTimeout(String message, byte[] payload, Throwable 
cause) {
-        super.handleException(new MllpInvalidMessageException(message, 
payload, cause));
+        getExceptionHandler().handleException(new 
MllpInvalidMessageException(message, payload, cause));
     }
 
     public void handleMessageException(String message, byte[] payload, 
Throwable cause) {
-        super.handleException(new MllpReceiveException(message, payload, 
cause));
+        getExceptionHandler().handleException(new 
MllpReceiveException(message, payload, cause));
     }
 
     public MllpConfiguration getConfiguration() {
@@ -213,6 +213,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         try {
             log.info("Starting consumer for Socket {}", clientSocket);
             consumerExecutor.submit(client);
+            getEndpoint().updateLastConnectionEstablishedTicks();
         } catch (RejectedExecutionException rejectedExecutionEx) {
             log.warn("Cannot start consumer - max consumers already active");
             mllpBuffer.resetSocket(clientSocket);
@@ -220,7 +221,10 @@ public class MllpTcpServerConsumer extends DefaultConsumer 
{
     }
 
     public void processMessage(byte[] hl7MessageBytes, 
TcpSocketConsumerRunnable consumerRunnable) {
-        getEndpoint().updateLastConnectionActivityTicks();
+        long now = System.currentTimeMillis();
+
+        getEndpoint().updateLastConnectionActivityTicks(now);
+        consumerRunnables.put(consumerRunnable, now);
 
         // Send the message on to Camel for processing and wait for the 
response
         log.debug("Populating the exchange with received message");
@@ -268,7 +272,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
                 getProcessor().process(exchange);
                 sendAcknowledgement(hl7MessageBytes, exchange, 
consumerRunnable);
             } catch (Exception unexpectedEx) {
-                getExceptionHandler().handleException("Unexpected exception 
processing exchange", exchange, unexpectedEx);
+                String resetMessage = "Unexpected exception processing 
exchange";
+                consumerRunnable.resetSocket(resetMessage);
+                getExceptionHandler().handleException(resetMessage, exchange, 
unexpectedEx);
             }
         } catch (Exception uowEx) {
             getExceptionHandler().handleException("Unexpected exception 
creating Unit of Work", exchange, uowEx);
@@ -408,7 +414,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
                         + MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING + "(type = 
" + acknowledgementBytesProperty.getClass().getSimpleName() + ") exchange 
properties can be converted to byte[]";
                 MllpInvalidAcknowledgementException invalidAckEx = new 
MllpInvalidAcknowledgementException(exceptionMessage, originalHl7MessageBytes, 
acknowledgementMessageBytes);
                 
exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, 
invalidAckEx);
-                handleException(invalidAckEx);
+                getExceptionHandler().handleException(invalidAckEx);
             } else {
                 String acknowledgmentTypeProperty = 
exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
                 String msa3 = 
exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_MSA_TEXT, String.class);
@@ -451,7 +457,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
 
                 } catch (MllpAcknowledgementGenerationException 
ackGenerationException) {
                     
exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, 
ackGenerationException);
-                    handleException(ackGenerationException);
+                    
getExceptionHandler().handleException(ackGenerationException);
                 }
             }
         } else {

-- 
To stop receiving notification emails like this one, please contact
qu...@apache.org.

Reply via email to