Added camel-mllp component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bd1661b2 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bd1661b2 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bd1661b2 Branch: refs/heads/master Commit: bd1661b248c960e200bd5f69e1ba34989529a712 Parents: c34db42 Author: Quinn Stevenson <qu...@pronoia-solutions.com> Authored: Mon Dec 28 11:38:14 2015 -0700 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Dec 30 09:43:10 2015 +0100 ---------------------------------------------------------------------- .../src/main/descriptors/common-bin.xml | 1 + components/camel-mllp/LIMITATIONS.md | 35 + components/camel-mllp/README.md | 132 +++ components/camel-mllp/pom.xml | 101 +++ .../mllp/MllpAcknowledgementException.java | 39 + .../MllpAcknowledgementTimoutException.java | 38 + ...pplicationErrorAcknowledgementException.java | 38 + ...plicationRejectAcknowledgementException.java | 38 + .../camel/component/mllp/MllpComponent.java | 49 ++ .../camel/component/mllp/MllpConstants.java | 64 ++ .../mllp/MllpCorruptFrameException.java | 40 + .../camel/component/mllp/MllpEndpoint.java | 283 ++++++ .../camel/component/mllp/MllpException.java | 49 ++ .../MllpInvalidAcknowledgementException.java | 38 + .../component/mllp/MllpTcpClientProducer.java | 232 +++++ .../component/mllp/MllpTcpServerConsumer.java | 550 ++++++++++++ .../component/mllp/MllpTimeoutException.java | 39 + .../component/mllp/MllpWriteException.java | 39 + .../camel/component/mllp/impl/MllpUtil.java | 371 ++++++++ .../Hl7AcknowledgementGenerationException.java | 41 + .../mllp/Hl7AcknowledgementGenerator.java | 150 ++++ .../services/org/apache/camel/component/mllp | 1 + .../mllp/MllpProducerConsumerLoopbackTest.java | 121 +++ .../MllpTcpClientConsumerBlueprintTest.java | 36 + ...llpTcpClientProducerAcknowledgementTest.java | 138 +++ .../MllpTcpClientProducerBlueprintTest.java | 123 +++ .../mllp/MllpTcpClientProducerTest.java | 249 ++++++ .../MllpTcpServerConsumerBlueprintTest.java | 101 +++ .../MllpTcpServerConsumerConnectionTest.java | 102 +++ ...pTcpServerConsumerMulitpleTcpPacketTest.java | 120 +++ .../mllp/MllpTcpServerConsumerTest.java | 169 ++++ .../MllpTcpServerProducerBlueprintTest.java | 35 + .../junit/rule/mllp/MllpClientResource.java | 461 ++++++++++ .../MllpJUnitResourceCorruptFrameException.java | 38 + .../rule/mllp/MllpJUnitResourceException.java | 38 + .../mllp/MllpJUnitResourceTimeoutException.java | 38 + .../junit/rule/mllp/MllpServerResource.java | 878 +++++++++++++++++++ .../camel/test/mllp/Hl7MessageGenerator.java | 84 ++ .../camel/test/mllp/PassthroughProcessor.java | 47 + .../apache/camel/test/tcp/JavaSocketTests.java | 176 ++++ .../blueprint/mllp-tcp-client-producer-test.xml | 114 +++ .../blueprint/mllp-tcp-server-consumer-test.xml | 44 + .../src/test/resources/log4j.properties | 41 + components/pom.xml | 1 + parent/pom.xml | 5 + .../features/src/main/resources/features.xml | 3 + 46 files changed, 5530 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/apache-camel/src/main/descriptors/common-bin.xml ---------------------------------------------------------------------- diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml index 9d2e447..cd70770 100644 --- a/apache-camel/src/main/descriptors/common-bin.xml +++ b/apache-camel/src/main/descriptors/common-bin.xml @@ -142,6 +142,7 @@ <include>org.apache.camel:camel-mail</include> <include>org.apache.camel:camel-mina</include> <include>org.apache.camel:camel-mina2</include> + <include>org.apache.camel:camel-mllp</include> <include>org.apache.camel:camel-mongodb</include> <include>org.apache.camel:camel-mqtt</include> <include>org.apache.camel:camel-msv</include> http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/LIMITATIONS.md ---------------------------------------------------------------------- diff --git a/components/camel-mllp/LIMITATIONS.md b/components/camel-mllp/LIMITATIONS.md new file mode 100644 index 0000000..dab7957 --- /dev/null +++ b/components/camel-mllp/LIMITATIONS.md @@ -0,0 +1,35 @@ +Does not support CA, CE and CR Acknowledgements (no plans to implement) + +The MLLP Sequence protocol is not implemented (no plans to implement) + +The MllpTcpClientConsumer is not implemented yet + +The MllpTcpServerProducer is not implemented yet + +idleTimeout is not enforced yet - connections can remain open indefinately + +maxConnections are not enforced yet - an unlimited number of connections are allowed + +Suspending a MLLP Consumer is not yet implemented + +RouteBuilders for standard usage - + The plan for this is to have a few RouteBuilders packaged with the component that + can be used as-is for some of the standard use-cases ( i.e. MLLP to JMS, JMS to MLLP, etc). + +JMX Instrumentation is not yet implemented + - Manually reset connection + - Suspend a consumer + +MLLP Statistics are not yet implemented + TCP Server + - last message received time + - last AA sent time + - last AE sent time + - last AR sent time + - AA, AE & AR counts + - last message sent time + - last message sent time + - last AA received time + - last AE received time + - last AR received time + - AA, AE & AR counts http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/README.md ---------------------------------------------------------------------- diff --git a/components/camel-mllp/README.md b/components/camel-mllp/README.md new file mode 100644 index 0000000..e7b0fc9 --- /dev/null +++ b/components/camel-mllp/README.md @@ -0,0 +1,132 @@ +# camel-mllp +Camel MLLP Component + +See MLLP Specification here: +http://www.hl7.org/documentcenter/public_temp_E7494E36-1C23-BA17-0C5E72EF77542E1F/wg/inm/mllp_transport_specification.PDF + +# Description +The camel-mllp component is specifically designed to handle the nuances of the MLLP protocol and provide +the functionality required by Healthcare providers to communicate with other systems using the MLLP protocol. + +The component is capable of handling both byte[] and String payloads which allows using other HL7 Libraries (i.e. HAPI) +to parse the messages. When String payloads are specified, the component will use the systems default Charset for +encoding/decoding, but this can be overridden. + +The component provides a simple URI for configuring MLLP endpoints: +MLLP-Producers: + to( "mllp://<host or ip>:<port>" ) +MLLP-Consumers: + from("mllp://<host or ip>:<listening port>) to listen on a specific address + from("mllp://port") to listen on all local addresses + +MLLP-Producers also interrogate the HL7 Acknowledgment received from the external system and if a negative acknowledgment +is received, the producer sets an exception on the exchange indicating the type of negative acknowledgement that was +received (i.e. a HL7 Application Reject Acknowledgement or a HL7 Application Error Acknowledgement). This enables +the use of Camel Redelivery Policies to configure redelivery attempts and routing erroneous messages to alternate +endpoints for analysis. + +MLLP-Consumers will, by default, automatically generate an acknowledgement. A HL7 Application Accept Acknowledgment +will be generated for successfully processed messages, or a HL7 Application Error Acknowledgement for messages where an +exception is raised during the processing of the exchange. The HL7 acknowledgement can also be specified by setting the +CamelMllpAcknowledgement property on the exchange - if present, the value of this property will be used for the HL7 +acknowledgment. The automatic generation of an HL7 acknowledgment can be completely disabled by setting the autoAck +URI parameter to false. If autoAck=false and the CamelMllpAcknowledgment property is not set on the exchange, and +exception will be raised. + +The component also provides a Camel Processor that is capable of generating HL7 Acknowledgements. Therefore, the HAPI +is not required to generate HL7 Acknowledgements - however, it can be used if desired. +By default, the processor will generate a HL7 Application Accept Acknowledgement if there is not an exception on the +Camel Exchange - otherwise it will generate a HL7 Application Error Acknowledgement. The generated acknowledgement +is placed in the CamelMllpAcknowledgment property on the Exchange. + +Regardless of whether the HL7 Acknowledgment is generated or specified using the CamelMllpAcknowledgement Exchange property, +the MLLP-Consumer will set the CamelHllpAcknowledgement and CamelHllpAcknowledgementCode headers on the Message after the +acknowledgment is successfully transmitted to the external system. + +Since the MLLP protocol does not typically use a large number of concurrent connections, the camel-mllp component uses +a simple thread-per-connection model based an standard Java Sockets. This keeps the implementation simple, and also +eliminates the dependencies on other camel components. + +# Rationalization +While Camel already includes some of this functionality in the camel-hl7 component, the implementation does not +provide all of the functionality required to effectively handle the MLLP protocol - especially under adverse conditions. + +The camel-hl7 Mina2 codec and Netty4 decoder do not handle MLLP Framing errors very well - the component will hang waiting +for frames to complete in some instances. + +While both camel-mina2 and camel-netty4 provide a "timeout" function, it is only applied to Producers. MLLP Consumers +also need to be able to timeout to recover from MLLP framing errors. Additionally, the timeout functionality of the +camel-netty4 component is disable after any data is received on the connection, making in ineffective for detecting +timeouts after the first messages is received. + +Also, neither the Mina2 codec nor the Netty4 decoder interrogate HL7 Acknowledgments. Therefore, it is much more +difficult to use the redelivery and error handling features Camel provides. + +The above issues may be addressable by updating/patching the existing components, but there is one more that is not. +Both camel-netty4 and camel-mina2 are designed to handle a large number of concurrent connections, rapid connect/disconnect +rates, and asynchronous communication. Forcing, these components to deal with the small number of stateful connections +inherent to the MLLP protocol seems inappropriate. + +An attempt was made to update the camel-netty4 decoder provided by the camel-hl7 component to deal with the nuances of +the MLLP protocol, but it quickly became very complicated. The decoder was updated to correctly deal with the MLLP +frame. The current implementation is based on the Netty DelimiterBasedFrameDecoder, but this decoder only looks for a +single byte terminator and MLLP uses two bytes to terminate it's frame. Additionally, the second terminating byte of the +MLLP frame can't be used alone because that byte is also contained in HL7 messages (it is the HL7 Segment Delimiter). +An implementation resembling the Netty LineBasedFrameDecoder was written at it correctly handled the MLLP framing issues, +but the timout issues were never addressed. + +For MLLP Consumers, the decoder needs to dynamically install a timeout handler whenver a partial MLLP frame is received, +and then remove it when the frame is completed. For MLLP Producers, the decoder would need to install a timeout handler +at some point to enable detecting a missing acknowledgement in addition to the timeout handler to deal with the +partial/incomplete acknowledgement. + +# MLLP Background +The MLLP protocol is inherently synchronous because external systems almost always require the order of messages to be +maintained (i.e. FIFO delivery). + +When a MLLP-Producer sends a message to an external system, it is required to wait for +an HL7 Acknowledgement before sending the next message. Additionally, the content of the acknowlegement must be examined +to determine the specific type of the acknowlegement before the next message can be transmitted. If the acknowledgement +is a HL7 Application Error Acknowledgement, the MLLP-Producer should retransmit the message a few times (the number of +redelivery attempts is application specific). If the acknowledgment is a HL7 Application Reject Acknowledgement, there +is something wrong with the message and redelivery will never succeed. The HL7 Messages acknowledged with an HL7 Application +Reject Acknowledgement must be routed to an alternate destination to allow users to investigate the nature of the error +so it can be corrected. + +When a MLLP-Consumer receives a message from an external system, the sending system is required to wait for an HL7 Acknowledgement +before transmitting the next message. The MLLP-Consumer normally persists the message in a durable store and then replies +to the sending system with a HL7 Application Accept Acknowledgement. If a transient error occurs while persisting the +message, the MLLP-Consumer should reply with a HL7 Application Error Acknowledgment and allow the external system to resend +the message. If the MLLP-Consumer detects the that the received message is invalid for some reason and the message could +never be processed, it should reply with an HL7 Application Reject Acknowledgement and the sending system should not +attempt to resend the message. + +NOTE: Some external systems do not handle HL7 NACKS ( HL7 Application Reject Acknowledgments and HL7 Application Error +Acknowledgements) - they do not interrogate the HL7 Acknowledgment to determine if it is a negative acknowledgement and +assume any acknowledgement received is an HL7 ACK (HL7 Application Accept Acknowledgement). In order to prevent message +loss when dealing with external systems that behave in this fashion, the MLLP-Consumer must be capable of closing the +TCP connection in lew of sending and HL7 NACK, which will force the external system to resend the message. Additionally, +the MLLP-Consumer may be required to behave differently for each type of HL7 NACK - it may need to close the TCP connection +instead of sending HL7 Application Error Acknowledgements, and route the messages that would be normally not be persisted +to an alternate durable store before sending the the HL7 Application Reject acknowledgement. + +Systems using the MLLP protocol normally use stateful TCP connections - the connections are established and left open +for extended periods of time. + +A MLLP-Consumer endpoint may have more than one TCP connection at a given time, but this is not the typical case. Normally +there is a single active TCP connection to a MLLP-Consumer. + +A MLLP-Producer endpoint should only have a single TCP connection at any given time. If the producer attempts to open more +than one connection to an external system, it oftentimes causes issues with the external system. Additionally, since FIFO +must be maintained, only + +# CamelPatches + +To help simplify bringing Camel-MLLP into Apache Camel, we have take care to build a set of patches that help integrate the component +into Camel. + +features.xml is a diff file that will add Camel-MLLP to the camel features list when on Apache Karaf. + +parent-pom.xml is a diff file that contains an update to Camel's Parent pom. + +common-bin.xml is a diff file that contains an update to the common distribution's include statements. http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-mllp/pom.xml b/components/camel-mllp/pom.xml new file mode 100644 index 0000000..3a41bcf --- /dev/null +++ b/components/camel-mllp/pom.xml @@ -0,0 +1,101 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.17-SNAPSHOT</version> + </parent> + + <artifactId>camel-mllp</artifactId> + <packaging>bundle</packaging> + <name>Camel :: MLLP</name> + <description>Camel MLLP Component</description> + + <properties> + <camel.osgi.export.pkg>org.apache.camel.component.mllp.*,org.apache.camel.processor.mllp.*</camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=mllp</camel.osgi.export.service> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + + + <!-- support camel documentation --> + <!--dependency> + <groupId>org.apache.camel</groupId> + <artifactId>apt</artifactId> + </dependency --> + + <!-- logging --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + + <!-- testing --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-blueprint</artifactId> + <scope>test</scope> + </dependency> + + <!-- test logging --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + <!-- Ensure that test runs do no leave running JVMs --> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19</version> + <configuration> + <shutdown>kill</shutdown> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java new file mode 100644 index 0000000..624745c --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Base class for HL7 Application Acknowledgement Exceptions + */ +public abstract class MllpAcknowledgementException extends MllpException { + public MllpAcknowledgementException(String message) { + super(message); + } + + public MllpAcknowledgementException(String message, byte[] mllpPayload) { + super(message, mllpPayload); + } + + public MllpAcknowledgementException(String message, Throwable cause) { + super(message, cause); + } + + public MllpAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) { + super(message, mllpPayload, cause); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java new file mode 100644 index 0000000..9288a14 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Raised when a MLLP Producer does not receive a HL7 acknowledgement within the configured timespan + */ +public class MllpAcknowledgementTimoutException extends MllpTimeoutException { + public MllpAcknowledgementTimoutException(String message) { + super(message); + } + + public MllpAcknowledgementTimoutException(String message, byte[] mllpPayload) { + super(message, mllpPayload); + } + + public MllpAcknowledgementTimoutException(String message, Throwable cause) { + super(message, cause); + } + + public MllpAcknowledgementTimoutException(String message, byte[] mllpPayload, Throwable cause) { + super(message, mllpPayload, cause); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java new file mode 100644 index 0000000..7d7ecd2 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Raised when a MLLP Producer receives a HL7 Application Error Acknowledgement + */ +public class MllpApplicationErrorAcknowledgementException extends MllpAcknowledgementException { + public MllpApplicationErrorAcknowledgementException(String message) { + super(message); + } + + public MllpApplicationErrorAcknowledgementException(String message, byte[] mllpPayload) { + super(message, mllpPayload); + } + + public MllpApplicationErrorAcknowledgementException(String message, Throwable cause) { + super(message, cause); + } + + public MllpApplicationErrorAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) { + super(message, mllpPayload, cause); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java new file mode 100644 index 0000000..ef2a39d --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Raised when a MLLP Producer receives a HL7 Application Reject Acknowledgement + */ +public class MllpApplicationRejectAcknowledgementException extends MllpAcknowledgementException { + public MllpApplicationRejectAcknowledgementException(String message) { + super(message); + } + + public MllpApplicationRejectAcknowledgementException(String message, byte[] mllpPayload) { + super(message, mllpPayload); + } + + public MllpApplicationRejectAcknowledgementException(String message, Throwable cause) { + super(message, cause); + } + + public MllpApplicationRejectAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) { + super(message, mllpPayload, cause); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java new file mode 100644 index 0000000..d815158 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.UriEndpointComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents the component that manages {@link MllpEndpoint}. + */ +public class MllpComponent extends UriEndpointComponent { + public static final String MLLP_LOG_PHI_PROPERTY = "org.apache.camel.mllp.logPHI"; + + Logger log = LoggerFactory.getLogger(this.getClass()); + + public MllpComponent() { + super(MllpEndpoint.class); + } + + public MllpComponent(CamelContext context) { + super(context, MllpEndpoint.class); + } + + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + Endpoint endpoint = new MllpEndpoint(uri, this); + setProperties(endpoint, parameters); + return endpoint; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java new file mode 100644 index 0000000..da5eb77 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +public final class MllpConstants { + public static final String MLLP_LOCAL_ADDRESS = "CamelMllpLocalAddress"; + public static final String MLLP_REMOTE_ADDRESS = "CamelMllpRemoteAddress"; + + public static final String MLLP_ACKNOWLEDGEMENT = "CamelMllpAcknowledgement"; + public static final String MLLP_ACKNOWLEDGEMENT_TYPE = "CamelMllpAcknowledgementType"; + + /* + Connection Control Exchange Properties + - For Consumers, "SEND" => ACKNOWLEDGEMENT + - For Producers, "SEND" => MESSAGE + */ + public static final String MLLP_CLOSE_CONNECTION_BEFORE_SEND = "CamelMllpCloseConnectionBeforeSend"; + public static final String MLLP_RESET_CONNECTION_BEFORE_SEND = "CamelMllpResetConnectionBeforeSend"; + public static final String MLLP_CLOSE_CONNECTION_AFTER_SEND = "CamelMllpCloseConnectionAfterSend"; + public static final String MLLP_RESET_CONNECTION_AFTER_SEND = "CamelMllpResetConnectionAfterSend"; + + // MSH-3 + public static final String MLLP_SENDING_APPLICATION = "CamelMllpSendingApplication"; + // MSH-4 + public static final String MLLP_SENDING_FACILITY = "CamelMllpSendingFacility"; + // MSH-5 + public static final String MLLP_RECEIVING_APPLICATION = "CamelMllpReceivingApplication"; + // MSH-6 + public static final String MLLP_RECEIVING_FACILITY = "CamelMllpReceivingFacility"; + // MSH-7 + public static final String MLLP_TIMESTAMP = "CamelMllpTimestamp"; + // MSH-8 + public static final String MLLP_SECURITY = "CamelMllpSecurity"; + // MSH-9 + public static final String MLLP_MESSAGE_TYPE = "CamelMllpMessageType"; + // MSH-9.1 + public static final String MLLP_EVENT_TYPE = "CamelMllpEventType"; + // MSH-9.2 + public static final String MLLP_TRIGGER_EVENT = "CamelMllpTriggerEvent"; + // MSH-10 + public static final String MLLP_MESSAGE_CONTROL = "CamelMllpMessageControlId"; + // MSH-11 + public static final String MLLP_PROCESSING_ID = "CamelMllpProcessingId"; + // MSH-12 + public static final String MLLP_VERSION_ID = "CamelMllpVersionId"; + // MSH-18 + public static final String MLLP_CHARSET = "CamelMllpCharset"; + + private MllpConstants() { } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCorruptFrameException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCorruptFrameException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCorruptFrameException.java new file mode 100644 index 0000000..0310530 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCorruptFrameException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Raised when a MLLP Producer or consumer encounters a corrupt MLLP Frame while attempting + * to read or write a MLLP payload. + */ +public class MllpCorruptFrameException extends MllpException { + public MllpCorruptFrameException(String message) { + super(message); + } + + public MllpCorruptFrameException(String message, byte[] mllpPayload) { + super(message, mllpPayload); + } + + public MllpCorruptFrameException(String message, Throwable cause) { + super(message, cause); + } + + public MllpCorruptFrameException(String message, byte[] mllpPayload, Throwable cause) { + super(message, mllpPayload, cause); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java new file mode 100644 index 0000000..eda5757 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +import org.apache.camel.Consumer; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents a MLLP endpoint. + * + * NOTE: MLLP payloads are not logged unless the logging level is set to DEBUG or TRACE to avoid introducing PHI + * into the log files. Logging of PHI can be globally disabled by setting the org.apache.camel.mllp.logPHI system + * property to false. + * <p/> + */ +@UriEndpoint(scheme = "mllp", title = "mllp", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp") +public class MllpEndpoint extends DefaultEndpoint { + public static final char START_OF_BLOCK = 0x0b; // VT (vertical tab) - decimal 11, octal 013 + public static final char END_OF_BLOCK = 0x1c; // FS (file separator) - decimal 28, octal 034 + public static final char END_OF_DATA = 0x0d; // CR (carriage return) - decimal 13, octal 015 + public static final int END_OF_STREAM = -1; // + public static final char SEGMENT_DELIMITER = 0x0d; // CR (carriage return) - decimal 13, octal 015 + public static final char MESSAGE_TERMINATOR = 0x0a; // LF (line feed, new line) - decimal 10, octal 012 + + Logger log = LoggerFactory.getLogger(this.getClass()); + + @UriPath(defaultValue = "0.0.0.0", description = "Hostname or IP for connection") + String hostname = "0.0.0.0"; + + @UriPath(description = "TCP Port for connection") + int port = -1; + + // TODO: Move URI Params to a MllpConfiguration class + @UriParam(defaultValue = "5", description = "TCP Server only - The maximum queue length for incoming connection indications (a request to connect) is set to the backlog parameter. If a " + + "connection indication arrives when the queue is full, the connection is refused.") + int backlog = 5; + + @UriParam(defaultValue = "30000", description = "TCP Server only - timeout value while waiting for a TCP listener to start (milliseconds)") + int bindTimeout = 30000; + + @UriParam(defaultValue = "30000", description = "TCP Server only - timeout value while waiting for a TCP connection (milliseconds)") + int acceptTimeout = 30000; + + @UriParam(defaultValue = "30000", description = "TCP Client only - timeout value while establishing for a TCP connection (milliseconds)") + int connectTimeout = 30000; + + @UriParam(defaultValue = "5000", description = "Timeout value (milliseconds) used when reading a message from an external") + int responseTimeout = 5000; + + @UriParam(defaultValue = "true", description = "Enable/disable the SO_KEEPALIVE socket option.") + boolean keepAlive = true; + + @UriParam(defaultValue = "true", description = "Enable/disable the TCP_NODELAY socket option.") + boolean tcpNoDelay = true; + + @UriParam(defaultValue = "false", description = "Enable/disable the SO_REUSEADDR socket option.") + boolean reuseAddress; + + @UriParam(description = "Sets the SO_RCVBUF option to the specified value") + Integer receiveBufferSize; + + @UriParam(description = "Sets the SO_SNDBUF option to the specified value") + Integer sendBufferSize; + + @UriParam(defaultValue = "0", description = "The amount of time a TCP connection can remain idle before it is closed") + int idleTimeout; + + @UriParam(description = "The TCP mode of the endpoint (client or server). Defaults to client for Producers and server for Consumers") + String tcpMode; + + @UriParam(defaultValue = "true", description = "MLLP Consumers only - Automatically generate and send an MLLP Acknowledgement") + boolean autoAck = true; + + @UriParam(description = "Set the CamelCharsetName property on the exchange") + String charsetName; + + public MllpEndpoint(String uri, MllpComponent component) { + super(uri, component); + + // mllp://hostname:port + String hostPort; + // look for options + int optionsStartIndex = uri.indexOf('?'); + if (-1 == optionsStartIndex) { + // No options - just get the host/port stuff + hostPort = uri.substring(7); + } else { + hostPort = uri.substring(7, optionsStartIndex); + } + + // Make sure it has a host - may just be a port + int colonIndex = hostPort.indexOf(':'); + if (-1 != colonIndex) { + hostname = hostPort.substring(0, colonIndex); + port = Integer.parseInt(hostPort.substring(colonIndex + 1)); + } else { + // No host specified - leave the default host and set the port + port = Integer.parseInt(hostPort); + } + } + + + @Override + public ExchangePattern getExchangePattern() { + return ExchangePattern.InOut; + } + + @Override + public Exchange createExchange() { + return this.createExchange(getExchangePattern()); + } + + @Override + public Exchange createExchange(ExchangePattern exchangePattern) { + Exchange mllpExchange = super.createExchange(exchangePattern); + setExchangeProperties(mllpExchange); + + return mllpExchange; + } + + @Override + public Exchange createExchange(Exchange exchange) { + Exchange mllpExchange = super.createExchange(exchange); + setExchangeProperties(mllpExchange); + + return mllpExchange; + } + + private void setExchangeProperties(Exchange mllpExchange) { + if (null != charsetName) { + mllpExchange.setProperty(Exchange.CHARSET_NAME, charsetName); + } + } + + public Producer createProducer() throws Exception { + log.trace("({}).createProducer()", this.getEndpointKey()); + + return new MllpTcpClientProducer(this); + } + + public Consumer createConsumer(Processor processor) throws Exception { + log.trace("({}).createConsumer(processor)", this.getEndpointKey()); + + return new MllpTcpServerConsumer(this, processor); + } + + @Override + public boolean isSynchronous() { + return true; + } + + public boolean isSingleton() { + return true; + } + + public String getCharsetName() { + return charsetName; + } + + public void setCharsetName(String charsetName) { + this.charsetName = charsetName; + } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + /** + * The maximum queue length for incoming connection indications (a request to connect) is set to the backlog parameter. If a connection indication arrives when the queue is full, the connection + * is refused. + */ + public int getBacklog() { + return backlog; + } + + /** + * The maximum queue length for incoming connection indications (a request to connect) is set to the backlog parameter. If a connection indication arrives when the queue is full, the connection + * is refused. + */ + public void setBacklog(int backlog) { + this.backlog = backlog; + } + + public int getAcceptTimeout() { + return acceptTimeout; + } + + public void setAcceptTimeout(int acceptTimeout) { + this.acceptTimeout = acceptTimeout; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public int getResponseTimeout() { + return responseTimeout; + } + + public void setResponseTimeout(int responseTimeout) { + this.responseTimeout = responseTimeout; + } + + public boolean isKeepAlive() { + return keepAlive; + } + + public void setKeepAlive(boolean keepAlive) { + this.keepAlive = keepAlive; + } + + public boolean isTcpNoDelay() { + return tcpNoDelay; + } + + public void setTcpNoDelay(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public boolean isReuseAddress() { + return reuseAddress; + } + + public void setReuseAddress(boolean reuseAddress) { + this.reuseAddress = reuseAddress; + } + + public int getReceiveBufferSize() { + return receiveBufferSize; + } + + public void setReceiveBufferSize(int receiveBufferSize) { + this.receiveBufferSize = receiveBufferSize; + } + + public int getSendBufferSize() { + return sendBufferSize; + } + + public void setSendBufferSize(int sendBufferSize) { + this.sendBufferSize = sendBufferSize; + } + + public boolean isAutoAck() { + return autoAck; + } + + public void setAutoAck(boolean autoAck) { + this.autoAck = autoAck; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java new file mode 100644 index 0000000..a1bb182 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Base class for all MLLP Exceptions, and also used as a generic MLLP exception + */ +public class MllpException extends Exception { + private final byte[] mllpPayload; + + public MllpException(String message) { + super(message); + this.mllpPayload = null; + } + + public MllpException(String message, byte[] mllpPayload) { + super(message); + this.mllpPayload = mllpPayload; + } + + public MllpException(String message, Throwable cause) { + super(message, cause); + this.mllpPayload = null; + } + + public MllpException(String message, byte[] mllpPayload, Throwable cause) { + super(message, cause); + this.mllpPayload = mllpPayload; + } + + public byte[] getMllpPayload() { + return mllpPayload; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java new file mode 100644 index 0000000..495cfae --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +/** + * Raised when a MLLP Producer receives a HL7 Acknowledgement for which the HL7 Acknowledgement type cannot be determined. + */ +public class MllpInvalidAcknowledgementException extends MllpAcknowledgementException { + public MllpInvalidAcknowledgementException(String message) { + super(message); + } + + public MllpInvalidAcknowledgementException(String message, byte[] mllpPayload) { + super(message, mllpPayload); + } + + public MllpInvalidAcknowledgementException(String message, Throwable cause) { + super(message, cause); + } + + public MllpInvalidAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) { + super(message, mllpPayload, cause); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bd1661b2/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java new file mode 100644 index 0000000..7b9d86b --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.SocketTimeoutException; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.component.mllp.impl.MllpUtil; +import org.apache.camel.impl.DefaultProducer; + +import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_AFTER_SEND; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_BEFORE_SEND; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_LOCAL_ADDRESS; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_REMOTE_ADDRESS; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_RESET_CONNECTION_AFTER_SEND; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_RESET_CONNECTION_BEFORE_SEND; +import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER; + +/** + * The MLLP producer. + */ +public class MllpTcpClientProducer extends DefaultProducer { + MllpEndpoint endpoint; + + Socket socket; + + public MllpTcpClientProducer(MllpEndpoint endpoint) throws SocketException { + super(endpoint); + log.trace("MllpTcpClientProducer(endpoint)"); + + this.endpoint = endpoint; + } + + @Override + protected void doStart() throws Exception { + log.trace("doStart()"); + + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + log.trace("doStop()"); + + MllpUtil.closeConnection(socket); + + super.doStop(); + } + + @Override + public void process(Exchange exchange) throws Exception { + log.trace("process(exchange)"); + + // Check BEFORE_SEND Properties + if (exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class)) { + MllpUtil.resetConnection(socket); + return; + } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class)) { + MllpUtil.closeConnection(socket); + } + + Exception connectionException = checkConnection(); + if (null != connectionException) { + exchange.setException(connectionException); + return; + } + + Message message; + if (exchange.hasOut()) { + message = exchange.getOut(); + } else { + message = exchange.getIn(); + } + + byte[] hl7MessageBytes = message.getMandatoryBody(byte[].class); + + log.debug("Sending message to external system"); + try { + MllpUtil.writeFramedPayload(socket, hl7MessageBytes); + } catch (MllpException mllpEx) { + exchange.setException(mllpEx); + return; + } + + log.debug("Reading acknowledgement from external system"); + byte[] acknowledgementBytes = null; + try { + MllpUtil.openFrame(socket); + acknowledgementBytes = MllpUtil.closeFrame(socket); + } catch (SocketTimeoutException timeoutEx) { + exchange.setException(new MllpAcknowledgementTimoutException("Acknowledgement timout", timeoutEx)); + return; + } catch (MllpException mllpEx) { + exchange.setException(mllpEx); + return; + } + + log.debug("Populating the exchange with the acknowledgement from the external system"); + message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes); + + message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString()); + message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress()); + + // Now, extract the acknowledgement type and check for a NACK + byte fieldDelim = acknowledgementBytes[3]; + // First, find the beginning of the MSA segment - should be the second segment + int msaStartIndex = -1; + for (int i = 0; i < acknowledgementBytes.length; ++i) { + if (SEGMENT_DELIMITER == acknowledgementBytes[i]) { + final byte bM = 77; + final byte bS = 83; + final byte bA = 65; + final byte bE = 69; + final byte bR = 82; + /* We've found the start of a new segment - make sure peeking ahead + won't run off the end of the array - we need at least 7 more bytes + */ + if (acknowledgementBytes.length > i + 7) { + // We can safely peek ahead + if (bM == acknowledgementBytes[i + 1] && bS == acknowledgementBytes[i + 2] && bA == acknowledgementBytes[i + 3] && fieldDelim == acknowledgementBytes[i + 4]) { + // Found the beginning of the MSA - the next two bytes should be our acknowledgement code + msaStartIndex = i + 1; + if (bA != acknowledgementBytes[i + 5]) { + exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes))); + } else { + switch (acknowledgementBytes[i + 6]) { + case bA: + // We have an AA - make sure that's the end of the field + if (fieldDelim != acknowledgementBytes[i + 7]) { + exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes))); + } + message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA"); + break; + case bE: + // We have an AE + exchange.setException(new MllpApplicationErrorAcknowledgementException(new String(acknowledgementBytes))); + message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE"); + break; + case bR: + exchange.setException(new MllpApplicationRejectAcknowledgementException(new String(acknowledgementBytes))); + message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AR"); + break; + default: + exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes))); + } + } + + break; + } + } + } + + } + if (-1 == msaStartIndex) { + // Didn't find an MSA + exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes))); + } + // Check AFTER_SEND Properties + if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) { + MllpUtil.resetConnection(socket); + return; + } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) { + MllpUtil.closeConnection(socket); + } + } + + /** + * Validate the TCP Connection + * + * @return null if the connection is valid, otherwise the Exception encounted checking the connection + */ + Exception checkConnection() { + if (null == socket || socket.isClosed() || !socket.isConnected()) { + socket = new Socket(); + + try { + socket.setKeepAlive(endpoint.keepAlive); + socket.setTcpNoDelay(endpoint.tcpNoDelay); + if (null != endpoint.receiveBufferSize) { + socket.setReceiveBufferSize(endpoint.receiveBufferSize); + } + if (null != endpoint.sendBufferSize) { + socket.setSendBufferSize(endpoint.sendBufferSize); + } + socket.setReuseAddress(endpoint.reuseAddress); + socket.setSoLinger(false, -1); + + // Read Timeout + socket.setSoTimeout(endpoint.responseTimeout); + } catch (SocketException e) { + return e; + } + + + SocketAddress address = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort()); + log.debug("Connecting to socket on {}", address); + try { + socket.connect(address, endpoint.connectTimeout); + } catch (SocketTimeoutException e) { + return e; + } catch (IOException e) { + return e; + } + } + + return null; + + } + +}