CAMEL-10511: Updated MllpTcpClientProducer and MllpTcpServerConsumer to consume all available data on socket
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/91121843 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/91121843 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/91121843 Branch: refs/heads/master Commit: 91121843801aed6411b7c91da3b1e502e2d35009 Parents: c03b2cd Author: Quinn Stevenson <qu...@pronoia-solutions.com> Authored: Tue Dec 6 12:23:03 2016 -0700 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Dec 15 20:20:20 2016 +0100 ---------------------------------------------------------------------- .../MllpAcknowledgementDeliveryException.java | 8 + .../mllp/MllpAcknowledgementException.java | 42 +- .../MllpAcknowledgementTimeoutException.java | 69 +++ .../MllpAcknowledgementTimoutException.java | 30 -- ...pplicationErrorAcknowledgementException.java | 7 +- ...plicationRejectAcknowledgementException.java | 7 +- ...MllpCommitErrorAcknowledgementException.java | 7 +- ...llpCommitRejectAcknowledgementException.java | 7 +- .../camel/component/mllp/MllpComponent.java | 38 ++ .../camel/component/mllp/MllpConstants.java | 1 + .../camel/component/mllp/MllpEndpoint.java | 40 ++ .../camel/component/mllp/MllpException.java | 81 ++- .../component/mllp/MllpFrameException.java | 39 +- .../mllp/MllpInvalidMessageException.java | 30 ++ .../MllpNegativeAcknowledgementException.java | 32 ++ .../MllpReceiveAcknowledgementException.java | 57 ++ .../component/mllp/MllpReceiveException.java | 46 ++ .../component/mllp/MllpTcpClientProducer.java | 208 ++++---- .../component/mllp/MllpTcpServerConsumer.java | 319 ++++++++--- .../component/mllp/MllpTimeoutException.java | 50 +- .../component/mllp/MllpWriteException.java | 37 +- .../AcknowledgmentSynchronizationAdapter.java | 212 -------- .../camel/component/mllp/impl/Hl7Util.java | 90 ++++ .../mllp/impl/MllpBufferedSocketWriter.java | 123 +++++ .../component/mllp/impl/MllpSocketReader.java | 290 ++++++++++ .../component/mllp/impl/MllpSocketUtil.java | 230 ++++++++ .../component/mllp/impl/MllpSocketWriter.java | 143 +++++ .../camel/component/mllp/impl/MllpUtil.java | 390 -------------- .../mllp/Hl7AcknowledgementGenerator.java | 1 + .../mllp/MllpAcknowledgementExceptionTest.java | 127 ----- .../camel/component/mllp/MllpExceptionTest.java | 114 ++++ .../component/mllp/MllpFrameExceptionTest.java | 101 ---- .../mllp/MllpProducerConsumerLoopbackTest.java | 16 +- .../MllpTcpClientConsumerBlueprintTest.java | 36 -- ...llpTcpClientProducerAcknowledgementTest.java | 207 ++++++- ...ntProducerAcknowledgementValidationTest.java | 283 ++++++++++ .../MllpTcpClientProducerBlueprintTest.java | 14 +- ...llpTcpClientProducerConnectionErrorTest.java | 165 ++++++ .../mllp/MllpTcpClientProducerTest.java | 82 ++- ...llpTcpServerConsumerAcknowledgementTest.java | 5 +- .../MllpTcpServerConsumerConnectionTest.java | 35 +- ...pTcpServerConsumerMessageValidationTest.java | 317 +++++++++++ .../mllp/MllpTcpServerConsumerTest.java | 128 ++++- .../MllpTcpServerProducerBlueprintTest.java | 35 -- .../mllp/MllpTimeoutExceptionTest.java | 101 ---- .../component/mllp/MllpWriteExceptionTest.java | 101 ---- .../camel/component/mllp/impl/Hl7UtilTest.java | 126 +++++ ...BufferedSocketAcknowledgementWriterTest.java | 125 +++++ .../MllpBufferedSocketMessageWriterTest.java | 126 +++++ .../MllpSocketAcknowledgementReaderTest.java | 533 +++++++++++++++++++ .../MllpSocketAcknowledgementWriterTest.java | 150 ++++++ .../mllp/impl/MllpSocketMessageReaderTest.java | 527 ++++++++++++++++++ .../mllp/impl/MllpSocketMessageWriterTest.java | 150 ++++++ .../mllp/impl/MllpSocketReaderTestSupport.java | 342 ++++++++++++ .../MllpSocketUtilExceptionHandlingTest.java | 133 +++++ .../impl/MllpSocketUtilFindXxxOfBlockTest.java | 241 +++++++++ .../mllp/impl/MllpSocketUtilSocketTest.java | 288 ++++++++++ .../mllp/impl/MllpSocketWriterTestSupport.java | 132 +++++ .../junit/rule/mllp/MllpServerResource.java | 497 +++++++++++------ .../apache/camel/test/util/PayloadBuilder.java | 211 ++++++++ .../src/test/resources/log4j2.properties | 10 +- 61 files changed, 6423 insertions(+), 1669 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java index 4ac46c9..c6fc878 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java @@ -29,4 +29,12 @@ public class MllpAcknowledgementDeliveryException extends MllpAcknowledgementExc public MllpAcknowledgementDeliveryException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause); } + + public MllpAcknowledgementDeliveryException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) { + super(message, hl7Message, hl7Acknowledgement); + } + + public MllpAcknowledgementDeliveryException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { + super(message, hl7Message, hl7Acknowledgement, cause); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java index 992d506..296430b 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java @@ -20,49 +20,25 @@ package org.apache.camel.component.mllp; * Base class for HL7 Application Acknowledgement Exceptions */ public abstract class MllpAcknowledgementException extends MllpException { - private final byte[] hl7Message; - private final byte[] hl7Acknowledgement; - public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) { + public MllpAcknowledgementException(String message) { super(message); - this.hl7Message = hl7Message; - this.hl7Acknowledgement = hl7Acknowledgement; } - public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { - super(message, cause); - this.hl7Message = hl7Message; - this.hl7Acknowledgement = hl7Acknowledgement; + public MllpAcknowledgementException(String message, byte[] hl7Message) { + super(message, hl7Message); } - public byte[] getHl7Message() { - return hl7Message; + public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) { + super(message, hl7Message, hl7Acknowledgement); } - public byte[] getHl7Acknowledgement() { - return hl7Acknowledgement; + public MllpAcknowledgementException(String message, byte[] hl7Message, Throwable cause) { + super(message, hl7Message, cause); } - @Override - public String getMessage() { - if (isLogPhi()) { - return String.format("%s:\n\tHL7 Message: %s\n\tHL7 Acknowledgement: %s", - super.getMessage(), covertBytesToPrintFriendlyString(hl7Message), covertBytesToPrintFriendlyString(hl7Acknowledgement)); - } else { - return super.getMessage(); - } + public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { + super(message, hl7Message, hl7Acknowledgement, cause); } - @Override - public String toString() { - StringBuilder stringBuilder = new StringBuilder(this.getClass().getName()); - - stringBuilder.append(": {hl7Message=") - .append(covertBytesToPrintFriendlyString(hl7Message)) - .append(", hl7Acknowledgement=") - .append(covertBytesToPrintFriendlyString(hl7Acknowledgement)) - .append("}"); - - return stringBuilder.toString(); - } } http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java new file mode 100644 index 0000000..8cb2822 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Raised when a MLLP Producer does not receive a HL7 acknowledgement within the configured timespan + */ +public class MllpAcknowledgementTimeoutException extends MllpAcknowledgementException { + static final String EXCEPTION_MESSAGE = "Timeout receiving HL7 Acknowledgement"; + + public MllpAcknowledgementTimeoutException(byte[] hl7Message) { + super(EXCEPTION_MESSAGE, hl7Message); + } + + public MllpAcknowledgementTimeoutException(byte[] hl7Message, byte[] partialHl7Acknowledgement) { + super(EXCEPTION_MESSAGE, hl7Message, partialHl7Acknowledgement); + } + + public MllpAcknowledgementTimeoutException(byte[] hl7Message, Throwable cause) { + super(EXCEPTION_MESSAGE, hl7Message, cause); + } + + public MllpAcknowledgementTimeoutException(byte[] hl7Message, byte[] partialHl7Acknowledgement, Throwable cause) { + super(EXCEPTION_MESSAGE, hl7Message, partialHl7Acknowledgement, cause); + } + + public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message) { + super(message, hl7Message); + } + + public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, byte[] partialHl7Acknowledgement) { + super(message, hl7Message, partialHl7Acknowledgement); + } + + public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, Throwable cause) { + super(message, hl7Message, cause); + } + + public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, byte[] partialHl7Acknowledgement, Throwable cause) { + super(message, hl7Message, partialHl7Acknowledgement, cause); + } + + /** + * Get the HL7 acknowledgement payload associated with this exception, if any. + * + * @return If the timeout occurred while attempting to receive an HL7 Message, this will be null. If the timeout + * occurred while attempting to receive an HL7 Acknowledgement, this will be the HL7 Message. If the timeout occurred + * while attempting to complete the read of an HL7 message (i.e. part of the message has already been read), this + * will be the partial acknowledgement payload that was read before the timeout. + */ + public byte[] getHl7Acknowledgement() { + return super.getHl7Acknowledgement(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java deleted file mode 100644 index a1cf147..0000000 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.mllp; - -/** - * Raised when a MLLP Producer does not receive a HL7 acknowledgement within the configured timespan - */ -public class MllpAcknowledgementTimoutException extends MllpTimeoutException { - public MllpAcknowledgementTimoutException(String message, byte[] hl7Message) { - super(message, hl7Message); - } - - public MllpAcknowledgementTimoutException(String message, byte[] hl7Message, Throwable cause) { - super(message, hl7Message, cause); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java index 5b6fed1..c8db47a 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java @@ -19,7 +19,7 @@ package org.apache.camel.component.mllp; /** * Raised when a MLLP Producer receives a HL7 Application Error Acknowledgement */ -public class MllpApplicationErrorAcknowledgementException extends MllpAcknowledgementException { +public class MllpApplicationErrorAcknowledgementException extends MllpNegativeAcknowledgementException { static final String EXCEPTION_MESSAGE = "HL7 Application Error Acknowledgment Received"; public MllpApplicationErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) { @@ -29,4 +29,9 @@ public class MllpApplicationErrorAcknowledgementException extends MllpAcknowledg public MllpApplicationErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause); } + + @Override + public String getAcknowledgmentType() { + return "AE"; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java index 76cbf2c..00dd3e4 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java @@ -19,7 +19,7 @@ package org.apache.camel.component.mllp; /** * Raised when a MLLP Producer receives a HL7 Application Reject Acknowledgement */ -public class MllpApplicationRejectAcknowledgementException extends MllpAcknowledgementException { +public class MllpApplicationRejectAcknowledgementException extends MllpNegativeAcknowledgementException { static final String EXCEPTION_MESSAGE = "HL7 Application Reject Acknowledgment Received"; public MllpApplicationRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) { @@ -29,4 +29,9 @@ public class MllpApplicationRejectAcknowledgementException extends MllpAcknowled public MllpApplicationRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause); } + + @Override + public String getAcknowledgmentType() { + return "AR"; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java index aef1a27..bec34d9 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java @@ -19,7 +19,7 @@ package org.apache.camel.component.mllp; /** * Raised when a MLLP Producer receives a HL7 Commit Error Acknowledgement */ -public class MllpCommitErrorAcknowledgementException extends MllpAcknowledgementException { +public class MllpCommitErrorAcknowledgementException extends MllpNegativeAcknowledgementException { static final String EXCEPTION_MESSAGE = "HL7 Commit Error Acknowledgment Received"; public MllpCommitErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) { @@ -29,4 +29,9 @@ public class MllpCommitErrorAcknowledgementException extends MllpAcknowledgement public MllpCommitErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause); } + + @Override + public String getAcknowledgmentType() { + return "CE"; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java index 3907694..f98ee5e 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java @@ -19,7 +19,7 @@ package org.apache.camel.component.mllp; /** * Raised when a MLLP Producer receives a HL7 Commit Reject Acknowledgement */ -public class MllpCommitRejectAcknowledgementException extends MllpAcknowledgementException { +public class MllpCommitRejectAcknowledgementException extends MllpNegativeAcknowledgementException { static final String EXCEPTION_MESSAGE = "HL7 Commit Reject Acknowledgment Received"; public MllpCommitRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) { @@ -29,4 +29,9 @@ public class MllpCommitRejectAcknowledgementException extends MllpAcknowledgemen public MllpCommitRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause); } + + @Override + public String getAcknowledgmentType() { + return "CR"; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java index b23a421..f3e22b1 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java @@ -22,6 +22,10 @@ import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.impl.UriEndpointComponent; +import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK; +import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA; +import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK; + /** * Represents the component that manages {@link MllpEndpoint}. */ @@ -64,4 +68,38 @@ public class MllpComponent extends UriEndpointComponent { return endpoint; } + public static boolean isLogPhi() { + String logPhiProperty = System.getProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true"); + return Boolean.valueOf(logPhiProperty); + } + + public static String covertToPrintFriendlyString(String hl7Message) { + if (hl7Message == null) { + return "null"; + } else if (hl7Message.isEmpty()) { + return "empty"; + } + + return hl7Message.replaceAll("" + START_OF_BLOCK, "<VT>").replaceAll("" + END_OF_BLOCK, "<FS>").replaceAll("\r", "<CR>").replaceAll("\n", "<LF>"); + } + + public static String covertBytesToPrintFriendlyString(byte[] hl7Bytes) { + if (hl7Bytes == null) { + return "null"; + } else if (hl7Bytes.length == 0) { + return ""; + } + + return covertBytesToPrintFriendlyString(hl7Bytes, 0, hl7Bytes.length); + } + + public static String covertBytesToPrintFriendlyString(byte[] hl7Bytes, int startPosition, int length) { + if (null == hl7Bytes) { + return "null"; + } else if (hl7Bytes.length == 0) { + return ""; + } + return covertToPrintFriendlyString(new String(hl7Bytes, startPosition, length)); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java index 8275a0e..4e14da0 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java @@ -21,6 +21,7 @@ public final class MllpConstants { public static final String MLLP_REMOTE_ADDRESS = "CamelMllpRemoteAddress"; public static final String MLLP_ACKNOWLEDGEMENT = "CamelMllpAcknowledgement"; + public static final String MLLP_ACKNOWLEDGEMENT_STRING = "CamelMllpAcknowledgementString"; public static final String MLLP_ACKNOWLEDGEMENT_TYPE = "CamelMllpAcknowledgementType"; public static final String MLLP_ACKNOWLEDGEMENT_EXCEPTION = "CamelMllpAcknowledgementException"; http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java index ae2e01f..3732254 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java @@ -100,6 +100,12 @@ public class MllpEndpoint extends DefaultEndpoint { @UriParam(defaultValue = "true") boolean hl7Headers = true; + @UriParam(defaultValue = "true") + boolean bufferWrites = true; + + @UriParam(defaultValue = "false") + boolean validatePayload; + @UriParam(label = "codec") String charsetName; @@ -391,4 +397,38 @@ public class MllpEndpoint extends DefaultEndpoint { public void setHl7Headers(boolean hl7Headers) { this.hl7Headers = hl7Headers; } + + public boolean isValidatePayload() { + return validatePayload; + } + + /** + * Enable/Disable the validation of HL7 Payloads + * + * If enabled, HL7 Payloads received from external systems will be validated (see Hl7Util.generateInvalidPayloadExceptionMessage for details on the validation). + * If and invalid payload is detected, a MllpInvalidMessageException (for consumers) or a MllpInvalidAcknowledgementException will be thrown. + * + * @param validatePayload enabled if true, otherwise disabled + */ + public void setValidatePayload(boolean validatePayload) { + this.validatePayload = validatePayload; + } + + public boolean isBufferWrites() { + return bufferWrites; + } + + /** + * Enable/Disable the validation of HL7 Payloads + * + * If enabled, MLLP Payloads are buffered and written to the external system in a single write(byte[]) operation. + * If disabled, the MLLP payload will not be buffered, and three write operations will be used. The first operation + * will write the MLLP start-of-block character {0x0b (ASCII VT)}, the second operation will write the HL7 payload, and the + * third operation will writh the MLLP end-of-block character and the MLLP end-of-data character {[0x1c, 0x0d] (ASCII [FS, CR])}. + * + * @param bufferWrites enabled if true, otherwise disabled + */ + public void setBufferWrites(boolean bufferWrites) { + this.bufferWrites = bufferWrites; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java index f0d5e84..0d2c1ce 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java @@ -20,25 +20,88 @@ package org.apache.camel.component.mllp; * Base class for all MLLP Exceptions, and also used as a generic MLLP exception */ public class MllpException extends Exception { + private final byte[] hl7Message; + private final byte[] hl7Acknowledgement; + public MllpException(String message) { super(message); + this.hl7Message = null; + this.hl7Acknowledgement = null; + } + + public MllpException(String message, byte[] hl7Message) { + super(message); + this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null; + this.hl7Acknowledgement = null; + } + + public MllpException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) { + super(message); + this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null; + this.hl7Acknowledgement = (hl7Acknowledgement != null && hl7Acknowledgement.length > 0) ? hl7Acknowledgement : null; } public MllpException(String message, Throwable cause) { super(message, cause); + this.hl7Message = null; + this.hl7Acknowledgement = null; } - public boolean isLogPhi() { - String logPhiProperty = System.getProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true"); - return Boolean.valueOf(logPhiProperty); + public MllpException(String message, byte[] hl7Message, Throwable cause) { + super(message, cause); + this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null; + this.hl7Acknowledgement = null; } - protected String covertBytesToPrintFriendlyString(byte[] hl7Bytes) { - if (null == hl7Bytes) { - return "null"; - } else if (hl7Bytes.length == 0) { - return ""; + public MllpException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { + super(message, cause); + this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null; + this.hl7Acknowledgement = (hl7Acknowledgement != null && hl7Acknowledgement.length > 0) ? hl7Acknowledgement : null; + } + + + /** + * Get the HL7 message payload associated with this exception, if any. + * + * @return HL7 message payload + */ + public byte[] getHl7Message() { + return hl7Message; + } + + /** + * Get the HL7 acknowledgement payload associated with this exception, if any. + * + * @return HL7 acknowledgement payload + */ + public byte[] getHl7Acknowledgement() { + return hl7Acknowledgement; + } + + /** + * Override the base version of this method, and include the HL7 Message and Acknowledgement, if any + * + * @return the detail message of this MLLP Exception + */ + @Override + public String getMessage() { + if (MllpComponent.isLogPhi()) { + return String.format("%s \n\t{hl7Message= %s} \n\t{hl7Acknowledgement= %s}", + super.getMessage(), MllpComponent.covertBytesToPrintFriendlyString(hl7Message), MllpComponent.covertBytesToPrintFriendlyString(hl7Acknowledgement)); + } else { + return super.getMessage(); } - return new String(hl7Bytes).replaceAll("\r", "<CR>").replaceAll("\n", "<LF>"); } + + /** + * Return the MLLP Payload that is most likely the cause of the Exception + * + * If the HL7 Acknowledgement is present, return it. Otherwise, return the HL7 Message. + * + * @return the MLLP Payload with the framing error + */ + public byte[] getMllpPayload() { + return (hl7Acknowledgement != null && hl7Acknowledgement.length > 0) ? hl7Acknowledgement : hl7Message; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java index 43b1281..7b7a3c4 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java @@ -17,44 +17,25 @@ package org.apache.camel.component.mllp; /** + * @deprecated - replaced by more specific exceptions. MllpTimeoutException, MllpInvalidMessageException and MllpInvalidAcknowledgementException * Raised when a MLLP Producer or consumer encounters a corrupt MLLP Frame while attempting - * to read or write a MLLP payload. + * to readEnvelopedPayload or writeEnvelopedMessage a MLLP payload. */ public class MllpFrameException extends MllpException { - private final byte[] mllpPayload; - - public MllpFrameException(String message, byte[] mllpPayload) { - super(message); - this.mllpPayload = mllpPayload; - } - - public MllpFrameException(String message, byte[] mllpPayload, Throwable cause) { - super(message, cause); - this.mllpPayload = mllpPayload; + public MllpFrameException(String message, byte[] hl7Message) { + super(message, hl7Message); } - public byte[] getMllpPayload() { - return mllpPayload; + public MllpFrameException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) { + super(message, hl7Message, hl7Acknowledgement); } - @Override - public String getMessage() { - if (isLogPhi()) { - return String.format("%s:\n\tMLLP Payload: %s", super.getMessage(), covertBytesToPrintFriendlyString(mllpPayload)); - } else { - return super.getMessage(); - } + public MllpFrameException(String message, byte[] hl7Message, Throwable cause) { + super(message, hl7Message, cause); } - @Override - public String toString() { - StringBuilder stringBuilder = new StringBuilder(this.getClass().getName()); - - stringBuilder.append(": {mllpPayload=") - .append(covertBytesToPrintFriendlyString(mllpPayload)) - .append("}"); - - return stringBuilder.toString(); + public MllpFrameException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { + super(message, hl7Message, hl7Acknowledgement, cause); } } http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java new file mode 100644 index 0000000..9cde29a --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Raised when a MLLP Consumer receives an invalid HL7 Message. + */ +public class MllpInvalidMessageException extends MllpException { + public MllpInvalidMessageException(String message, byte[] hl7Message) { + super(message, hl7Message); + } + + public MllpInvalidMessageException(String message, byte[] hl7Message, Throwable cause) { + super(message, hl7Message, cause); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java new file mode 100644 index 0000000..6b8a4f8 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Abstract base for all MLLP Negative Acknowledgements + */ +public abstract class MllpNegativeAcknowledgementException extends MllpAcknowledgementException { + public MllpNegativeAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) { + super(message, hl7Message, hl7Acknowledgement); + } + + public MllpNegativeAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { + super(message, hl7Message, hl7Acknowledgement, cause); + } + + public abstract String getAcknowledgmentType(); +} http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java new file mode 100644 index 0000000..2268137 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Raised when a MLLP Consumer cannot deliver the MLLP Acknowledgement + */ +public class MllpReceiveAcknowledgementException extends MllpAcknowledgementException { + static final String EXCEPTION_MESSAGE = "HL7 Acknowledgment Receipt Failed"; + + public MllpReceiveAcknowledgementException(byte[] hl7Message) { + super(EXCEPTION_MESSAGE, hl7Message); + } + + + public MllpReceiveAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) { + super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement); + } + + public MllpReceiveAcknowledgementException(byte[] hl7Message, Throwable cause) { + super(EXCEPTION_MESSAGE, hl7Message, cause); + } + + + public MllpReceiveAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { + super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause); + } + + public MllpReceiveAcknowledgementException(String message, byte[] hl7Message) { + super(message, hl7Message); + } + + public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) { + super(message, hl7Message, hl7Acknowledgement); + } + + public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, Throwable cause) { + super(message, hl7Message, cause); + } + public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { + super(message, hl7Message, hl7Acknowledgement, cause); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java new file mode 100644 index 0000000..c755d02 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Raised when a MLLP Consumer cannot deliver the MLLP Acknowledgement + */ +public class MllpReceiveException extends MllpException { + public MllpReceiveException(String message) { + super(message); + } + + public MllpReceiveException(String message, byte[] hl7Message) { + super(message, hl7Message); + } + + public MllpReceiveException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) { + super(message, hl7Message, hl7Acknowledgement); + } + + public MllpReceiveException(String message, Throwable cause) { + super(message, cause); + } + + public MllpReceiveException(String message, byte[] hl7Message, Throwable cause) { + super(message, hl7Message, cause); + } + + public MllpReceiveException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { + super(message, hl7Message, hl7Acknowledgement, cause); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/91121843/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java index 5793aea..69176bc 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java @@ -20,14 +20,19 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; -import java.net.SocketTimeoutException; import org.apache.camel.Exchange; import org.apache.camel.Message; -import org.apache.camel.component.mllp.impl.MllpUtil; +import org.apache.camel.component.mllp.impl.Hl7Util; +import org.apache.camel.component.mllp.impl.MllpBufferedSocketWriter; +import org.apache.camel.component.mllp.impl.MllpSocketReader; +import org.apache.camel.component.mllp.impl.MllpSocketUtil; +import org.apache.camel.component.mllp.impl.MllpSocketWriter; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.IOHelper; import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING; import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE; import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_AFTER_SEND; import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_BEFORE_SEND; @@ -45,6 +50,9 @@ public class MllpTcpClientProducer extends DefaultProducer { Socket socket; + MllpSocketReader mllpSocketReader; + MllpSocketWriter mllpSocketWriter; + public MllpTcpClientProducer(MllpEndpoint endpoint) throws SocketException { super(endpoint); log.trace("MllpTcpClientProducer(endpoint)"); @@ -63,7 +71,7 @@ public class MllpTcpClientProducer extends DefaultProducer { protected void doStop() throws Exception { log.trace("doStop()"); - MllpUtil.closeConnection(socket); + MllpSocketUtil.close(socket, log, "Stopping component"); super.doStop(); } @@ -74,15 +82,18 @@ public class MllpTcpClientProducer extends DefaultProducer { // Check BEFORE_SEND Properties if (exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class)) { - MllpUtil.resetConnection(socket); + MllpSocketUtil.reset(socket, log, "Exchange property " + MLLP_RESET_CONNECTION_BEFORE_SEND + " = " + exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class)); return; } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class)) { - MllpUtil.closeConnection(socket); + MllpSocketUtil.close(socket, log, "Exchange property " + MLLP_CLOSE_CONNECTION_BEFORE_SEND + " = " + exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class)); + return; } - Exception connectionException = checkConnection(); - if (null != connectionException) { - exchange.setException(connectionException); + // Establish a connection if needed + try { + checkConnection(); + } catch (IOException ioEx) { + exchange.setException(ioEx); return; } @@ -93,95 +104,118 @@ public class MllpTcpClientProducer extends DefaultProducer { message = exchange.getIn(); } + message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString()); + message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress().toString()); + + // Send the message to the external system byte[] hl7MessageBytes = message.getMandatoryBody(byte[].class); + byte[] acknowledgementBytes = null; - log.debug("Sending message to external system"); try { - MllpUtil.writeFramedPayload(socket, hl7MessageBytes); + log.debug("Sending message to external system"); + mllpSocketWriter.writeEnvelopedPayload(hl7MessageBytes, null); + log.debug("Reading acknowledgement from external system"); + acknowledgementBytes = mllpSocketReader.readEnvelopedPayload(hl7MessageBytes); + } catch (MllpWriteException writeEx) { + MllpSocketUtil.reset(socket, log, writeEx.getMessage()); + exchange.setException(writeEx); + return; + } catch (MllpReceiveException ackReceiveEx) { + MllpSocketUtil.reset(socket, log, ackReceiveEx.getMessage()); + exchange.setException(ackReceiveEx); + return; } catch (MllpException mllpEx) { + Throwable mllpExCause = mllpEx.getCause(); + if (mllpExCause != null && mllpExCause instanceof IOException) { + MllpSocketUtil.reset(socket, log, mllpEx.getMessage()); + } exchange.setException(mllpEx); return; } - log.debug("Reading acknowledgement from external system"); - byte[] acknowledgementBytes = null; - try { - if (MllpUtil.openFrame(socket, endpoint.receiveTimeout, endpoint.readTimeout)) { - acknowledgementBytes = MllpUtil.closeFrame(socket, endpoint.receiveTimeout, endpoint.readTimeout); + log.debug("Populating message headers with the acknowledgement from the external system"); + message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes); + message.setHeader(MLLP_ACKNOWLEDGEMENT_STRING, new String(acknowledgementBytes, IOHelper.getCharsetName(exchange, true))); + + if (endpoint.validatePayload) { + String exceptionMessage = Hl7Util.generateInvalidPayloadExceptionMessage(acknowledgementBytes); + if (exceptionMessage != null) { + exchange.setException(new MllpInvalidAcknowledgementException(exceptionMessage, hl7MessageBytes, acknowledgementBytes)); + return; } - } catch (SocketTimeoutException timeoutEx) { - exchange.setException(new MllpAcknowledgementTimoutException("Acknowledgement timout", hl7MessageBytes, timeoutEx)); - return; + } + + log.debug("Processing the acknowledgement from the external system"); + try { + String acknowledgementType = processAcknowledgment(hl7MessageBytes, acknowledgementBytes); + message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementType); } catch (MllpException mllpEx) { exchange.setException(mllpEx); return; } - if (null != acknowledgementBytes) { - log.debug("Populating the exchange with the acknowledgement from the external system"); - message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes); + // Check AFTER_SEND Properties + if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) { + MllpSocketUtil.reset(socket, log, "Exchange property " + MLLP_RESET_CONNECTION_AFTER_SEND + " = " + exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)); + } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) { + MllpSocketUtil.close(socket, log, "Exchange property " + MLLP_CLOSE_CONNECTION_AFTER_SEND + " = " + exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)); + } + } - message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString()); - message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress()); + private String processAcknowledgment(byte[] hl7MessageBytes, byte[] hl7AcknowledgementBytes) throws MllpException { + String acknowledgementType = ""; - // Now, extract the acknowledgement type and check for a NACK - byte fieldDelim = acknowledgementBytes[3]; + if (hl7AcknowledgementBytes != null && hl7AcknowledgementBytes.length > 3) { + // Extract the acknowledgement type and check for a NACK + byte fieldDelim = hl7AcknowledgementBytes[3]; // First, find the beginning of the MSA segment - should be the second segment int msaStartIndex = -1; - for (int i = 0; i < acknowledgementBytes.length; ++i) { - if (SEGMENT_DELIMITER == acknowledgementBytes[i]) { + for (int i = 0; i < hl7AcknowledgementBytes.length; ++i) { + if (SEGMENT_DELIMITER == hl7AcknowledgementBytes[i]) { final byte bM = 77; final byte bS = 83; final byte bC = 67; final byte bA = 65; final byte bE = 69; final byte bR = 82; - /* We've found the start of a new segment - make sure peeking ahead - won't run off the end of the array - we need at least 7 more bytes - */ - if (acknowledgementBytes.length > i + 7) { + /* We've found the start of a new segment - make sure peeking ahead + won't run off the end of the array - we need at least 7 more bytes + */ + if (hl7AcknowledgementBytes.length > i + 7) { // We can safely peek ahead - if (bM == acknowledgementBytes[i + 1] && bS == acknowledgementBytes[i + 2] && bA == acknowledgementBytes[i + 3] && fieldDelim == acknowledgementBytes[i + 4]) { + if (bM == hl7AcknowledgementBytes[i + 1] && bS == hl7AcknowledgementBytes[i + 2] && bA == hl7AcknowledgementBytes[i + 3] && fieldDelim == hl7AcknowledgementBytes[i + 4]) { // Found the beginning of the MSA - the next two bytes should be our acknowledgement code msaStartIndex = i + 1; - if (bA != acknowledgementBytes[i + 5] && bC != acknowledgementBytes[i + 5]) { - exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes, i + 5, 2), hl7MessageBytes, acknowledgementBytes)); + if (bA != hl7AcknowledgementBytes[i + 5] && bC != hl7AcknowledgementBytes[i + 5]) { + String errorMessage = "Unsupported acknowledgement type: " + new String(hl7AcknowledgementBytes, i + 5, 2); + throw new MllpInvalidAcknowledgementException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); } else { - String acknowledgemenTypeString; - switch (acknowledgementBytes[i + 6]) { + switch (hl7AcknowledgementBytes[i + 6]) { case bA: - // We have an AA or CA- make sure that's the end of the field - if (fieldDelim != acknowledgementBytes[i + 7]) { - exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes, i + 5, 3), hl7MessageBytes, acknowledgementBytes)); - } - if (bA == acknowledgementBytes[i + 5]) { - message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA"); + // We have an AA or CA + if (bA == hl7AcknowledgementBytes[i + 5]) { + acknowledgementType = "AA"; } else { - message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CA"); + acknowledgementType = "CA"; } break; case bE: // We have an AE or CE - if (bA == acknowledgementBytes[i + 5]) { - message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE"); - exchange.setException(new MllpApplicationErrorAcknowledgementException(hl7MessageBytes, acknowledgementBytes)); + if (bA == hl7AcknowledgementBytes[i + 5]) { + throw new MllpApplicationErrorAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes); } else { - message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CE"); - exchange.setException(new MllpCommitErrorAcknowledgementException(hl7MessageBytes, acknowledgementBytes)); + throw new MllpCommitErrorAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes); } - break; case bR: // We have an AR or CR - if (bA == acknowledgementBytes[i + 5]) { - message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AR"); - exchange.setException(new MllpApplicationRejectAcknowledgementException(hl7MessageBytes, acknowledgementBytes)); + if (bA == hl7AcknowledgementBytes[i + 5]) { + throw new MllpApplicationRejectAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes); } else { - message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CR"); - exchange.setException(new MllpCommitRejectAcknowledgementException(hl7MessageBytes, acknowledgementBytes)); + throw new MllpCommitRejectAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes); } - break; default: - exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes, i + 5, 2), hl7MessageBytes, acknowledgementBytes)); + String errorMessage = "Unsupported acknowledgement type: " + new String(hl7AcknowledgementBytes, i + 5, 2); + throw new MllpInvalidAcknowledgementException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); } } @@ -191,18 +225,13 @@ public class MllpTcpClientProducer extends DefaultProducer { } } - if (-1 == msaStartIndex) { + if (-1 == msaStartIndex && endpoint.validatePayload) { // Didn't find an MSA - exchange.setException(new MllpInvalidAcknowledgementException("MSA Not found in acknowledgement", hl7MessageBytes, acknowledgementBytes)); + throw new MllpInvalidAcknowledgementException("MSA Not found in acknowledgement", hl7MessageBytes, hl7AcknowledgementBytes); } } - // Check AFTER_SEND Properties - if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) { - MllpUtil.resetConnection(socket); - return; - } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) { - MllpUtil.closeConnection(socket); - } + + return acknowledgementType; } /** @@ -210,28 +239,24 @@ public class MllpTcpClientProducer extends DefaultProducer { * * @return null if the connection is valid, otherwise the Exception encounted checking the connection */ - Exception checkConnection() { + void checkConnection() throws IOException { if (null == socket || socket.isClosed() || !socket.isConnected()) { socket = new Socket(); - try { - socket.setKeepAlive(endpoint.keepAlive); - socket.setTcpNoDelay(endpoint.tcpNoDelay); - if (null != endpoint.receiveBufferSize) { - socket.setReceiveBufferSize(endpoint.receiveBufferSize); - } - if (null != endpoint.sendBufferSize) { - socket.setSendBufferSize(endpoint.sendBufferSize); - } - socket.setReuseAddress(endpoint.reuseAddress); - socket.setSoLinger(false, -1); - - // Read Timeout - socket.setSoTimeout(endpoint.receiveTimeout); - } catch (SocketException e) { - return e; + socket.setKeepAlive(endpoint.keepAlive); + socket.setTcpNoDelay(endpoint.tcpNoDelay); + if (null != endpoint.receiveBufferSize) { + socket.setReceiveBufferSize(endpoint.receiveBufferSize); + } else { + endpoint.receiveBufferSize = socket.getReceiveBufferSize(); } - + if (null != endpoint.sendBufferSize) { + socket.setSendBufferSize(endpoint.sendBufferSize); + } else { + endpoint.sendBufferSize = socket.getSendBufferSize(); + } + socket.setReuseAddress(endpoint.reuseAddress); + socket.setSoLinger(false, -1); InetSocketAddress socketAddress; if (null == endpoint.getHostname()) { @@ -239,17 +264,18 @@ public class MllpTcpClientProducer extends DefaultProducer { } else { socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort()); } + log.debug("Connecting to socket on {}", socketAddress); - try { - socket.connect(socketAddress, endpoint.connectTimeout); - } catch (SocketTimeoutException e) { - return e; - } catch (IOException e) { - return e; + socket.connect(socketAddress, endpoint.connectTimeout); + + log.debug("Creating MllpSocketReader and MllpSocketWriter"); + mllpSocketReader = new MllpSocketReader(socket, endpoint.receiveTimeout, endpoint.readTimeout, true); + if (endpoint.bufferWrites) { + mllpSocketWriter = new MllpBufferedSocketWriter(socket, false); + } else { + mllpSocketWriter = new MllpSocketWriter(socket, false); } } - - return null; } }