http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java index 05b8cbe..0ec8564 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java @@ -15,7 +15,6 @@ * limitations under the License. */ package org.apache.camel.component.mllp; - import java.io.IOException; import java.io.InputStream; import java.net.BindException; @@ -29,18 +28,31 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; + import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.Processor; -import org.apache.camel.component.mllp.impl.AcknowledgmentSynchronizationAdapter; -import org.apache.camel.component.mllp.impl.MllpUtil; +import org.apache.camel.component.mllp.impl.Hl7Util; +import org.apache.camel.component.mllp.impl.MllpBufferedSocketWriter; +import org.apache.camel.component.mllp.impl.MllpSocketReader; +import org.apache.camel.component.mllp.impl.MllpSocketUtil; +import org.apache.camel.component.mllp.impl.MllpSocketWriter; +import org.apache.camel.converter.IOConverter; import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerationException; import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerator; import org.apache.camel.util.IOHelper; +import org.slf4j.MDC; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE; import static org.apache.camel.component.mllp.MllpConstants.MLLP_AUTO_ACKNOWLEDGE; import static org.apache.camel.component.mllp.MllpConstants.MLLP_CHARSET; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_AFTER_SEND; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_BEFORE_SEND; import static org.apache.camel.component.mllp.MllpConstants.MLLP_EVENT_TYPE; import static org.apache.camel.component.mllp.MllpConstants.MLLP_LOCAL_ADDRESS; import static org.apache.camel.component.mllp.MllpConstants.MLLP_MESSAGE_CONTROL; @@ -49,6 +61,8 @@ import static org.apache.camel.component.mllp.MllpConstants.MLLP_PROCESSING_ID; import static org.apache.camel.component.mllp.MllpConstants.MLLP_RECEIVING_APPLICATION; import static org.apache.camel.component.mllp.MllpConstants.MLLP_RECEIVING_FACILITY; import static org.apache.camel.component.mllp.MllpConstants.MLLP_REMOTE_ADDRESS; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_RESET_CONNECTION_AFTER_SEND; +import static org.apache.camel.component.mllp.MllpConstants.MLLP_RESET_CONNECTION_BEFORE_SEND; import static org.apache.camel.component.mllp.MllpConstants.MLLP_SECURITY; import static org.apache.camel.component.mllp.MllpConstants.MLLP_SENDING_APPLICATION; import static org.apache.camel.component.mllp.MllpConstants.MLLP_SENDING_FACILITY; @@ -56,16 +70,19 @@ import static org.apache.camel.component.mllp.MllpConstants.MLLP_TIMESTAMP; import static org.apache.camel.component.mllp.MllpConstants.MLLP_TRIGGER_EVENT; import static org.apache.camel.component.mllp.MllpConstants.MLLP_VERSION_ID; import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER; -import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK; /** * The MLLP consumer. */ public class MllpTcpServerConsumer extends DefaultConsumer { + public static final int SOCKET_STARTUP_TEST_WAIT = 100; + public static final int SOCKET_STARTUP_TEST_READ_TIMEOUT = 250; ServerSocketThread serverSocketThread; List<ClientSocketThread> clientThreads = new LinkedList<>(); + Hl7AcknowledgementGenerator acknowledgementGenerator = new Hl7AcknowledgementGenerator(); + private final MllpEndpoint endpoint; public MllpTcpServerConsumer(MllpEndpoint endpoint, Processor processor) { @@ -191,11 +208,10 @@ public class MllpTcpServerConsumer extends DefaultConsumer { * short. */ public void run() { - log.debug("Starting acceptor thread"); + MDC.put("camel.contextId", endpoint.getCamelContext().getName()); try { while (!isInterrupted() && null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) { - // TODO: Need to check maxConnections and figure out what to do when exceeded Socket socket = null; try { socket = serverSocket.accept(); @@ -229,16 +245,15 @@ public class MllpTcpServerConsumer extends DefaultConsumer { /* Wait a bit and then check and see if the socket is really there - it could be a load balancer pinging the port */ - Thread.sleep(100); if (socket.isConnected() && !socket.isClosed()) { - log.debug("Socket appears to be there - check for available data"); + log.debug("Socket appears to be there - checking for available data in {} milliseconds", SOCKET_STARTUP_TEST_WAIT); + Thread.sleep(SOCKET_STARTUP_TEST_WAIT); + InputStream inputStream; try { inputStream = socket.getInputStream(); } catch (IOException ioEx) { - // Bad Socket - - log.warn("Failed to retrieve the InputStream for socket after the initial connection was accepted"); - MllpUtil.resetConnection(socket); + MllpSocketUtil.reset(socket, log, "Failed to retrieve the InputStream for socket after the initial connection was accepted"); continue; } @@ -251,13 +266,12 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } // The easy check failed - so trigger a blocking read - socket.setSoTimeout(100); + MllpSocketUtil.setSoTimeout(socket, SOCKET_STARTUP_TEST_READ_TIMEOUT, log, "Preparing to check for available data on component startup"); try { int tmpByte = inputStream.read(); - socket.setSoTimeout(endpoint.receiveTimeout); if (-1 == tmpByte) { - log.debug("Socket.read() returned END_OF_STREAM - resetting connection"); - MllpUtil.resetConnection(socket); + log.debug("Check for available data failed - Socket.read() returned END_OF_STREAM"); + MllpSocketUtil.close(socket, null, null); } else { ClientSocketThread clientThread = new ClientSocketThread(socket, tmpByte); clientThreads.add(clientThread); @@ -265,10 +279,20 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } } catch (SocketTimeoutException timeoutEx) { // No data, but the socket is there - log.debug("No Data - but the socket is there. Starting ClientSocketThread"); + String logMessageFormat = + "Check for available data failed - Socket.read() timed-out after {} milliseconds." + + " No Data - but the socket is there. Starting ClientSocketThread"; + log.debug(logMessageFormat, SOCKET_STARTUP_TEST_READ_TIMEOUT); ClientSocketThread clientThread = new ClientSocketThread(socket, null); clientThreads.add(clientThread); clientThread.start(); + } catch (IOException ioEx) { + log.debug("Ignoring IOException encountered when attempting to read a byte - connection was reset"); + try { + socket.close(); + } catch (IOException closeEx) { + log.debug("Ignoring IOException encountered when attempting to close the connection after the connection reset was detected", closeEx); + } } } } catch (SocketTimeoutException timeoutEx) { @@ -320,10 +344,12 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } /** - * Nested Class read the Socket + * Nested Class reads the Socket */ class ClientSocketThread extends Thread { - Socket clientSocket; + final Socket clientSocket; + final MllpSocketReader mllpSocketReader; + final MllpSocketWriter mllpSocketWriter; Integer initialByte; @@ -343,8 +369,14 @@ public class MllpTcpServerConsumer extends DefaultConsumer { this.clientSocket.setSoLinger(false, -1); // Initial Read Timeout - this.clientSocket.setSoTimeout(endpoint.receiveTimeout); + MllpSocketUtil.setSoTimeout(clientSocket, endpoint.receiveTimeout, log, "Constructing ClientSocketThread"); + mllpSocketReader = new MllpSocketReader(this.clientSocket, endpoint.receiveTimeout, endpoint.readTimeout, false); + if (endpoint.bufferWrites) { + mllpSocketWriter = new MllpBufferedSocketWriter(this.clientSocket, true); + } else { + mllpSocketWriter = new MllpSocketWriter(this.clientSocket, true); + } } /** @@ -375,78 +407,238 @@ public class MllpTcpServerConsumer extends DefaultConsumer { @Override public void run() { int receiveTimeoutCounter = 0; + MDC.put("camel.contextId", endpoint.getCamelContext().getName()); while (!isInterrupted() && null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) { byte[] hl7MessageBytes = null; - // Send the message on for processing and wait for the response - log.debug("Reading data ...."); + + log.debug("Checking for data ...."); try { - if (null != initialByte && START_OF_BLOCK == initialByte) { - hl7MessageBytes = MllpUtil.closeFrame(clientSocket, endpoint.receiveTimeout, endpoint.readTimeout); - } else { - try { - if (!MllpUtil.openFrame(clientSocket, endpoint.receiveTimeout, endpoint.readTimeout)) { - receiveTimeoutCounter = 0; - continue; - } else { - receiveTimeoutCounter = 0; - } - } catch (SocketTimeoutException timeoutEx) { - // When thrown by openFrame, it indicates that no data was available - but no error - if (endpoint.maxReceiveTimeouts > 0 && ++receiveTimeoutCounter >= endpoint.maxReceiveTimeouts) { - // TODO: Enhance logging?? - log.warn("Idle Client - resetting connection"); - MllpUtil.resetConnection(clientSocket); - } - continue; + hl7MessageBytes = mllpSocketReader.readEnvelopedPayload(initialByte); + if (hl7MessageBytes == null) { + // No data received - check for max timeouts + if (endpoint.maxReceiveTimeouts > 0 && ++receiveTimeoutCounter >= endpoint.maxReceiveTimeouts) { + String reasonMessage = String.format("Idle Client after %d receive timeouts [%d-milliseconds] - resetting connection", receiveTimeoutCounter, endpoint.receiveTimeout); + MllpSocketUtil.reset(clientSocket, log, reasonMessage); } - hl7MessageBytes = MllpUtil.closeFrame(clientSocket, endpoint.receiveTimeout, endpoint.readTimeout); + continue; } } catch (MllpException mllpEx) { Exchange exchange = endpoint.createExchange(ExchangePattern.InOut); exchange.setException(mllpEx); - return; + log.warn("Exception encountered reading payload - sending exception to route", mllpEx); + try { + getProcessor().process(exchange); + } catch (Exception e) { + log.error("Exception encountered processing exchange with exception encounter reading payload", e); + } + continue; } finally { initialByte = null; } - if (null == hl7MessageBytes) { + // Send the message on for processing and wait for the response + log.debug("Populating the exchange with received message"); + Exchange exchange = endpoint.createExchange(ExchangePattern.InOut); + try { + createUoW(exchange); + Message message = exchange.getIn(); + message.setBody(hl7MessageBytes, byte[].class); + + message.setHeader(MLLP_LOCAL_ADDRESS, clientSocket.getLocalAddress().toString()); + message.setHeader(MLLP_REMOTE_ADDRESS, clientSocket.getRemoteSocketAddress()); + message.setHeader(MLLP_AUTO_ACKNOWLEDGE, endpoint.autoAck); + + if (endpoint.validatePayload) { + String exceptionMessage = Hl7Util.generateInvalidPayloadExceptionMessage(hl7MessageBytes); + if (exceptionMessage != null) { + exchange.setException(new MllpInvalidMessageException(exceptionMessage, hl7MessageBytes)); + } + } + populateHl7DataHeaders(exchange, message, hl7MessageBytes); + + log.debug("Calling processor"); + try { + getProcessor().process(exchange); + sendAcknowledgement(hl7MessageBytes, exchange); + } catch (RuntimeException runtimeEx) { + throw runtimeEx; + } catch (Exception ex) { + log.error("Unexpected exception processing exchange", ex); + } + } catch (Exception uowEx) { + // TODO: Handle this correctly + exchange.setException(uowEx); + log.warn("Exception encountered creating Unit of Work - sending exception to route", uowEx); + try { + getProcessor().process(exchange); + } catch (Exception e) { + log.error("Exception encountered processing exchange with exception encountered createing Unit of Work", e); + } continue; + } finally { + if (exchange != null) { + doneUoW(exchange); + } } - log.debug("Populating the exchange with received message"); - Exchange exchange = endpoint.createExchange(ExchangePattern.InOut); - Message message = exchange.getIn(); - message.setBody(hl7MessageBytes, byte[].class); - message.setHeader(MLLP_LOCAL_ADDRESS, clientSocket.getLocalAddress().toString()); - message.setHeader(MLLP_REMOTE_ADDRESS, clientSocket.getRemoteSocketAddress()); - message.setHeader(MLLP_AUTO_ACKNOWLEDGE, endpoint.autoAck); + } - populateHl7DataHeaders(exchange, message, hl7MessageBytes); + log.debug("ClientSocketThread exiting"); + } - exchange.addOnCompletion(new AcknowledgmentSynchronizationAdapter(clientSocket, hl7MessageBytes)); + private void sendAcknowledgement(byte[] originalHl7MessageBytes, Exchange exchange) { + log.info("sendAcknowledgement"); + + // Check BEFORE_SEND Properties + if (exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class)) { + String reasonMessage = String.format("Exchange property %s is %b", MLLP_RESET_CONNECTION_BEFORE_SEND, exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class)); + MllpSocketUtil.reset(clientSocket, log, reasonMessage); + return; + } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class)) { + String reasonMessage = String.format("Exchange property %s is %b", MLLP_CLOSE_CONNECTION_BEFORE_SEND, exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class)); + MllpSocketUtil.close(clientSocket, log, reasonMessage); + return; + } + + // Find the acknowledgement body + // TODO: Enhance this to say whether or not the acknowledgment is missing or just of an un-convertible type + byte[] acknowledgementMessageBytes = exchange.getProperty(MLLP_ACKNOWLEDGEMENT, byte[].class); + String acknowledgementMessageType = null; + if (null == acknowledgementMessageBytes) { + boolean autoAck = exchange.getProperty(MLLP_AUTO_ACKNOWLEDGE, true, boolean.class); + if (!autoAck) { + exchange.setException(new MllpInvalidAcknowledgementException("Automatic Acknowledgement is disabled and the " + + MLLP_ACKNOWLEDGEMENT + " exchange property is null or cannot be converted to byte[]", originalHl7MessageBytes, acknowledgementMessageBytes)); + return; + } - log.debug("Calling processor"); + String acknowledgmentTypeProperty = exchange.getProperty(MLLP_ACKNOWLEDGEMENT_TYPE, String.class); try { - getProcessor().process(exchange); - } catch (RuntimeException runtimeEx) { - throw runtimeEx; - } catch (Exception ex) { - log.error("Unexpected exception processing exchange", ex); - throw new RuntimeException("Unexpected exception processing exchange", ex); + if (null == acknowledgmentTypeProperty) { + if (null == exchange.getException()) { + acknowledgementMessageType = "AA"; + acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationAcceptAcknowledgementMessage(originalHl7MessageBytes); + } else { + acknowledgementMessageType = "AE"; + acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationErrorAcknowledgementMessage(originalHl7MessageBytes); + } + } else { + switch (acknowledgmentTypeProperty) { + case "AA": + acknowledgementMessageType = "AA"; + acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationAcceptAcknowledgementMessage(originalHl7MessageBytes); + break; + case "AE": + acknowledgementMessageType = "AE"; + acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationErrorAcknowledgementMessage(originalHl7MessageBytes); + break; + case "AR": + acknowledgementMessageType = "AR"; + acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationRejectAcknowledgementMessage(originalHl7MessageBytes); + break; + default: + exchange.setException(new Hl7AcknowledgementGenerationException("Unsupported acknowledgment type: " + acknowledgmentTypeProperty)); + return; + } + } + } catch (Hl7AcknowledgementGenerationException ackGenerationException) { + exchange.setProperty(MLLP_ACKNOWLEDGEMENT_EXCEPTION, ackGenerationException); + exchange.setException(ackGenerationException); } + } else { + final byte bM = 77; + final byte bS = 83; + final byte bA = 65; + final byte bE = 69; + final byte bR = 82; + + final byte fieldSeparator = originalHl7MessageBytes[3]; + // Acknowledgment is specified in exchange property - determine the acknowledgement type + for (int i = 0; i < originalHl7MessageBytes.length; ++i) { + if (SEGMENT_DELIMITER == i) { + if (i + 7 < originalHl7MessageBytes.length // Make sure we don't run off the end of the message + && bM == originalHl7MessageBytes[i + 1] && bS == originalHl7MessageBytes[i + 2] + && bA == originalHl7MessageBytes[i + 3] && fieldSeparator == originalHl7MessageBytes[i + 4]) { + if (fieldSeparator != originalHl7MessageBytes[i + 7]) { + log.warn("MSA-1 is longer than 2-bytes - ignoring trailing bytes"); + } + // Found MSA - pull acknowledgement bytes + byte[] acknowledgmentTypeBytes = new byte[2]; + acknowledgmentTypeBytes[0] = originalHl7MessageBytes[i + 5]; + acknowledgmentTypeBytes[1] = originalHl7MessageBytes[i + 6]; + try { + acknowledgementMessageType = IOConverter.toString(acknowledgmentTypeBytes, exchange); + } catch (IOException ioEx) { + throw new RuntimeException("Failed to convert acknowledgement message to string", ioEx); + } + // Verify it's a valid acknowledgement code + if (bA != acknowledgmentTypeBytes[0]) { + switch (acknowledgementMessageBytes[1]) { + case bA: + case bR: + case bE: + break; + default: + log.warn("Invalid acknowledgement type [" + acknowledgementMessageType + "] found in message - should be AA, AE or AR"); + } + } + + // if the MLLP_ACKNOWLEDGEMENT_TYPE property is set on the exchange, make sure it matches + String acknowledgementTypeProperty = exchange.getProperty(MLLP_ACKNOWLEDGEMENT_TYPE, String.class); + if (null != acknowledgementTypeProperty && !acknowledgementTypeProperty.equals(acknowledgementMessageType)) { + log.warn("Acknowledgement type found in message [" + acknowledgementMessageType + "] does not match " + + MLLP_ACKNOWLEDGEMENT_TYPE + " exchange property value [" + acknowledgementTypeProperty + "] - using value found in message"); + } + } + } + } } - log.debug("ClientSocketThread exiting"); + Message message; + if (exchange.hasOut()) { + message = exchange.getOut(); + } else { + message = exchange.getIn(); + } + message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementMessageBytes); + // TODO: Use the charset of the exchange + message.setHeader(MLLP_ACKNOWLEDGEMENT_STRING, new String(acknowledgementMessageBytes)); + message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementMessageType); + + // Send the acknowledgement + log.debug("Sending Acknowledgement: {}", MllpComponent.covertBytesToPrintFriendlyString(acknowledgementMessageBytes)); + try { + mllpSocketWriter.writeEnvelopedPayload(originalHl7MessageBytes, acknowledgementMessageBytes); + } catch (MllpException mllpEx) { + log.error("MLLP Acknowledgement failure: {}", mllpEx); + MllpAcknowledgementDeliveryException deliveryException = new MllpAcknowledgementDeliveryException(originalHl7MessageBytes, acknowledgementMessageBytes, mllpEx); + exchange.setProperty(MLLP_ACKNOWLEDGEMENT_EXCEPTION, deliveryException); + exchange.setException(deliveryException); + } + + // Check AFTER_SEND Properties + if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) { + String reasonMessage = String.format("Exchange property %s is %b", MLLP_RESET_CONNECTION_AFTER_SEND, exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)); + MllpSocketUtil.reset(clientSocket, log, reasonMessage); + return; + } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) { + String reasonMessage = String.format("Exchange property %s is %b", MLLP_CLOSE_CONNECTION_AFTER_SEND, exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)); + MllpSocketUtil.reset(clientSocket, log, reasonMessage); + } } private void populateHl7DataHeaders(Exchange exchange, Message message, byte[] hl7MessageBytes) { + if (hl7MessageBytes == null || hl7MessageBytes.length < 8) { + // Not enough data to populate anything - just return + return; + } // Find the end of the MSH and indexes of the fields in the MSH to populate message headers final byte fieldSeparator = hl7MessageBytes[3]; int endOfMSH = -1; - List<Integer> fieldSeparatorIndexes = new ArrayList<>(10); // We need at least 10 fields to create the acknowledgment + List<Integer> fieldSeparatorIndexes = new ArrayList<>(10); // We should have at least 10 fields for (int i = 0; i < hl7MessageBytes.length; ++i) { if (fieldSeparator == hl7MessageBytes[i]) { @@ -541,11 +733,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer { @Override public void interrupt() { if (null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) { - try { - clientSocket.close(); - } catch (IOException ex) { - log.warn("Exception encoutered closing client Socket in interrupt", ex); - } + MllpSocketUtil.close(clientSocket, log, this.getClass().getSimpleName() + " interrupted"); } super.interrupt(); }
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java index 7c2014a..b1e4906 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java @@ -17,43 +17,45 @@ package org.apache.camel.component.mllp; /** - * Raised when a MLLP Producer or Consumer encounter a timeout reading a message + * Raised when a MLLP Producer or Consumer encounter a timeout reading a message or an acknowledgment */ public class MllpTimeoutException extends MllpException { - private final byte[] hl7Message; + static final String EXCEPTION_MESSAGE = "Timeout receiving HL7 Message"; - public MllpTimeoutException(String message, byte[] hl7Message) { - super(message); - this.hl7Message = hl7Message; + public MllpTimeoutException(byte[] partialHl7Message) { + super(EXCEPTION_MESSAGE, partialHl7Message); } - public MllpTimeoutException(String message, byte[] hl7Message, Throwable cause) { - super(message, cause); - this.hl7Message = hl7Message; + public MllpTimeoutException(String message, byte[] partialHl7Message) { + super(message, partialHl7Message); } - public byte[] getHl7Message() { - return hl7Message; + public MllpTimeoutException(byte[] partialHl7Message, Throwable cause) { + super(EXCEPTION_MESSAGE, partialHl7Message, cause); } - @Override - public String getMessage() { - if (isLogPhi()) { - return String.format("%s:\n\tHL7 Message: %s", super.getMessage(), covertBytesToPrintFriendlyString(hl7Message)); - } else { - return super.getMessage(); - } + public MllpTimeoutException(String message, byte[] partialHl7Message, Throwable cause) { + super(message, partialHl7Message, cause); } - @Override - public String toString() { - StringBuilder stringBuilder = new StringBuilder(this.getClass().getName()); + protected MllpTimeoutException(String message, byte[] hl7Message, byte[] partialHl7Acknowledgement) { + super(message, hl7Message, partialHl7Acknowledgement); + } - stringBuilder.append(": {hl7Message=") - .append(covertBytesToPrintFriendlyString(hl7Message)) - .append("}"); + protected MllpTimeoutException(String message, byte[] hl7Message, byte[] partialHl7Acknowledgement, Throwable cause) { + super(message, hl7Message, partialHl7Acknowledgement, cause); + } - return stringBuilder.toString(); + /** + * Get the HL7 message payload associated with this exception, if any. + * + * @return If the timeout occurred while attempting to receive an HL7 Message, this will be null. If the timeout + * occurred while attempting to receive an HL7 Acknowledgement, this will be the HL7 Message. If the timeout occurred + * while attempting to complete the read of an HL7 message (i.e. part of the message has already been read), this + * will be the partial acknowledgement payload that was read before the timeout. + */ + public byte[] getHl7Message() { + return super.getHl7Message(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java index dd5bf42..2661d7c 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java @@ -20,40 +20,19 @@ package org.apache.camel.component.mllp; * Raised when a MLLP Producer or consumer encounter an error transmitting data */ public class MllpWriteException extends MllpException { - private final byte[] mllpPayload; - - public MllpWriteException(String message, byte[] mllpPayload) { - super(message); - this.mllpPayload = mllpPayload; - } - - public MllpWriteException(String message, byte[] mllpPayload, Throwable cause) { - super(message, cause); - this.mllpPayload = mllpPayload; + public MllpWriteException(String message, byte[] hl7Message) { + super(message, hl7Message); } - public byte[] getMllpPayload() { - return mllpPayload; + public MllpWriteException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) { + super(message, hl7Message, hl7Acknowledgement); } - @Override - public String getMessage() { - if (isLogPhi()) { - return String.format("%s:\n\tMLLP Payload: %s", super.getMessage(), covertBytesToPrintFriendlyString(mllpPayload)); - } else { - return super.getMessage(); - } + public MllpWriteException(String message, byte[] hl7Message, Throwable cause) { + super(message, hl7Message, cause); } - @Override - public String toString() { - StringBuilder stringBuilder = new StringBuilder(this.getClass().getName()); - - stringBuilder.append(": {mllpPayload=") - .append(covertBytesToPrintFriendlyString(mllpPayload)) - .append("}"); - - return stringBuilder.toString(); + public MllpWriteException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) { + super(message, hl7Message, hl7Acknowledgement, cause); } - } http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/AcknowledgmentSynchronizationAdapter.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/AcknowledgmentSynchronizationAdapter.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/AcknowledgmentSynchronizationAdapter.java deleted file mode 100644 index c7916d4..0000000 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/AcknowledgmentSynchronizationAdapter.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.mllp.impl; - -import java.io.IOException; -import java.net.Socket; - -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.Route; -import org.apache.camel.component.mllp.MllpAcknowledgementDeliveryException; -import org.apache.camel.component.mllp.MllpException; -import org.apache.camel.component.mllp.MllpInvalidAcknowledgementException; -import org.apache.camel.converter.IOConverter; -import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerationException; -import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerator; -import org.apache.camel.support.SynchronizationAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT; -import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION; -import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE; -import static org.apache.camel.component.mllp.MllpConstants.MLLP_AUTO_ACKNOWLEDGE; -import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_AFTER_SEND; -import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_BEFORE_SEND; -import static org.apache.camel.component.mllp.MllpConstants.MLLP_RESET_CONNECTION_AFTER_SEND; -import static org.apache.camel.component.mllp.MllpConstants.MLLP_RESET_CONNECTION_BEFORE_SEND; -import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER; - -public class AcknowledgmentSynchronizationAdapter extends SynchronizationAdapter { - Logger log = LoggerFactory.getLogger(this.getClass()); - final byte[] originalHl7MessageBytes; - Hl7AcknowledgementGenerator acknowledgementGenerator = new Hl7AcknowledgementGenerator(); - private Socket clientSocket; - - public AcknowledgmentSynchronizationAdapter(Socket clientSocket, byte[] hl7MessageBytes) { - this.clientSocket = clientSocket; - this.originalHl7MessageBytes = hl7MessageBytes; - } - - @Override - public int getOrder() { - return HIGHEST; - } - - @Override - public void onAfterRoute(Route route, Exchange exchange) { - log.info("onAfterRoute"); - - // Check BEFORE_SEND Properties - if (exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class)) { - MllpUtil.resetConnection(clientSocket); - return; - } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class)) { - MllpUtil.closeConnection(clientSocket); - return; - } - - // Find the acknowledgement body - // TODO: Enhance this to say whether or not the acknowledgment is missing or just of an unconvertible type - byte[] acknowledgementMessageBytes = exchange.getProperty(MLLP_ACKNOWLEDGEMENT, byte[].class); - String acknowledgementMessageType = null; - if (null == acknowledgementMessageBytes) { - boolean autoAck = exchange.getProperty(MLLP_AUTO_ACKNOWLEDGE, true, boolean.class); - if (!autoAck) { - exchange.setException(new MllpInvalidAcknowledgementException("Automatic Acknowledgement is disabled and the " - + MLLP_ACKNOWLEDGEMENT + " exchange property is null or cannot be converted to byte[]", originalHl7MessageBytes, acknowledgementMessageBytes)); - return; - } - - String acknowledgmentTypeProperty = exchange.getProperty(MLLP_ACKNOWLEDGEMENT_TYPE, String.class); - try { - if (null == acknowledgmentTypeProperty) { - if (null == exchange.getException()) { - acknowledgementMessageType = "AA"; - acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationAcceptAcknowledgementMessage(originalHl7MessageBytes); - } else { - acknowledgementMessageType = "AE"; - acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationErrorAcknowledgementMessage(originalHl7MessageBytes); - } - } else { - switch (acknowledgmentTypeProperty) { - case "AA": - acknowledgementMessageType = "AA"; - acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationAcceptAcknowledgementMessage(originalHl7MessageBytes); - break; - case "AE": - acknowledgementMessageType = "AE"; - acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationErrorAcknowledgementMessage(originalHl7MessageBytes); - break; - case "AR": - acknowledgementMessageType = "AR"; - acknowledgementMessageBytes = acknowledgementGenerator.generateApplicationRejectAcknowledgementMessage(originalHl7MessageBytes); - break; - default: - exchange.setException(new Hl7AcknowledgementGenerationException("Unsupported acknowledgment type: " + acknowledgmentTypeProperty)); - return; - } - } - } catch (Hl7AcknowledgementGenerationException ackGenerationException) { - exchange.setProperty(MLLP_ACKNOWLEDGEMENT_EXCEPTION, ackGenerationException); - exchange.setException(ackGenerationException); - } - } else { - final byte bM = 77; - final byte bS = 83; - final byte bA = 65; - final byte bE = 69; - final byte bR = 82; - - final byte fieldSeparator = originalHl7MessageBytes[3]; - // Acknowledgment is specified in exchange property - determine the acknowledgement type - for (int i = 0; i < originalHl7MessageBytes.length; ++i) { - if (SEGMENT_DELIMITER == i) { - if (i + 7 < originalHl7MessageBytes.length // Make sure we don't run off the end of the message - && bM == originalHl7MessageBytes[i + 1] && bS == originalHl7MessageBytes[i + 2] - && bA == originalHl7MessageBytes[i + 3] && fieldSeparator == originalHl7MessageBytes[i + 4]) { - if (fieldSeparator != originalHl7MessageBytes[i + 7]) { - log.warn("MSA-1 is longer than 2-bytes - ignoring trailing bytes"); - } - // Found MSA - pull acknowledgement bytes - byte[] acknowledgmentTypeBytes = new byte[2]; - acknowledgmentTypeBytes[0] = originalHl7MessageBytes[i + 5]; - acknowledgmentTypeBytes[1] = originalHl7MessageBytes[i + 6]; - try { - acknowledgementMessageType = IOConverter.toString(acknowledgmentTypeBytes, exchange); - } catch (IOException ioEx) { - throw new RuntimeException("Failed to convert acknowledgement message to string", ioEx); - } - - // Verify it's a valid acknowledgement code - if (bA != acknowledgmentTypeBytes[0]) { - switch (acknowledgementMessageBytes[1]) { - case bA: - case bR: - case bE: - break; - default: - log.warn("Invalid acknowledgement type [" + acknowledgementMessageType + "] found in message - should be AA, AE or AR"); - } - } - - // if the MLLP_ACKNOWLEDGEMENT_TYPE property is set on the exchange, make sure it matches - String acknowledgementTypeProperty = exchange.getProperty(MLLP_ACKNOWLEDGEMENT_TYPE, String.class); - if (null != acknowledgementTypeProperty && !acknowledgementTypeProperty.equals(acknowledgementMessageType)) { - log.warn("Acknowledgement type found in message [" + acknowledgementMessageType + "] does not match " - + MLLP_ACKNOWLEDGEMENT_TYPE + " exchange property value [" + acknowledgementTypeProperty + "] - using value found in message"); - } - } - } - } - } - - Message message; - if (exchange.hasOut()) { - message = exchange.getOut(); - } else { - message = exchange.getIn(); - } - message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementMessageBytes); - message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementMessageType); - - // Send the acknowledgement - log.debug("Sending Acknowledgement"); - try { - MllpUtil.writeFramedPayload(clientSocket, acknowledgementMessageBytes); - } catch (MllpException mllpEx) { - log.error("MLLP Acknowledgement failure: {}", mllpEx); - MllpAcknowledgementDeliveryException deliveryException = new MllpAcknowledgementDeliveryException(originalHl7MessageBytes, acknowledgementMessageBytes, mllpEx); - exchange.setProperty(MLLP_ACKNOWLEDGEMENT_EXCEPTION, deliveryException); - exchange.setException(deliveryException); - } - - // Check AFTER_SEND Properties - if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) { - MllpUtil.resetConnection(clientSocket); - return; - } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) { - MllpUtil.closeConnection(clientSocket); - } - - super.onAfterRoute(route, exchange); - } - - @Override - public void onComplete(Exchange exchange) { - log.info("onComplete"); - super.onComplete(exchange); - - } - - @Override - public void onFailure(Exchange exchange) { - log.warn("onFailure"); - super.onFailure(exchange); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/Hl7Util.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/Hl7Util.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/Hl7Util.java new file mode 100644 index 0000000..0c762cb --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/Hl7Util.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK; +import static org.apache.camel.component.mllp.MllpEndpoint.MESSAGE_TERMINATOR; +import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER; +import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK; + +public final class Hl7Util { + static final Logger LOG = LoggerFactory.getLogger(Hl7Util.class); + + private Hl7Util() { + } + + public static String generateInvalidPayloadExceptionMessage(final byte[] hl7Bytes) { + if (hl7Bytes == null) { + return "HL7 payload is null"; + } + + return generateInvalidPayloadExceptionMessage(hl7Bytes, hl7Bytes.length); + } + + /** + * Verifies that the HL7 payload array + * <p> + * The MLLP protocol does not allow embedded START_OF_BLOCK or END_OF_BLOCK characters. The END_OF_DATA character + * is allowed (and expected) because it is also the segment delimiter for an HL7 message + * + * @param hl7Bytes the HL7 payload to validate + * @return If the payload is invalid, an error message suitable for inclusion in an exception is returned. If + * the payload is valid, null is returned; + */ + public static String generateInvalidPayloadExceptionMessage(final byte[] hl7Bytes, final int length) { + if (hl7Bytes == null) { + return "HL7 payload is null"; + } + + if (hl7Bytes.length <= 0) { + return "HL7 payload is empty"; + } + + if (length > hl7Bytes.length) { + LOG.warn("The length specified for the HL7 payload array <{}> is greater than the actual length of the array <{}> - only validating {} bytes", length, hl7Bytes.length, length); + } + + if (hl7Bytes.length < 3 || hl7Bytes[0] != 'M' || hl7Bytes[1] != 'S' || hl7Bytes[2] != 'H') { + return String.format("The first segment of the HL7 payload {%s} is not an MSH segment", new String(hl7Bytes, 0, Math.min(3, hl7Bytes.length))); + } + + int validationLength = Math.min(length, hl7Bytes.length); + + if (hl7Bytes[validationLength - 2] != SEGMENT_DELIMITER || hl7Bytes[validationLength - 1] != MESSAGE_TERMINATOR) { + String format = "The HL7 payload terminating bytes [%#x, %#x] are incorrect - expected [%#x, %#x] {ASCII [<CR>, <LF>]}"; + return String.format(format, hl7Bytes[validationLength - 2], hl7Bytes[validationLength - 1], (byte) SEGMENT_DELIMITER, (byte) MESSAGE_TERMINATOR); + } + + for (int i = 0; i < validationLength; ++i) { + switch (hl7Bytes[i]) { + case START_OF_BLOCK: + return String.format("HL7 payload contains an embedded START_OF_BLOCK {%#x, ASCII <VT>} at index %d", hl7Bytes[i], i); + case END_OF_BLOCK: + return String.format("HL7 payload contains an embedded END_OF_BLOCK {%#x, ASCII <FS>} at index %d", hl7Bytes[i], i); + default: + // continue on + } + } + + return null; + } + + +} http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpBufferedSocketWriter.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpBufferedSocketWriter.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpBufferedSocketWriter.java new file mode 100644 index 0000000..46db872 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpBufferedSocketWriter.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp.impl; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketException; + +import org.apache.camel.component.mllp.MllpAcknowledgementDeliveryException; +import org.apache.camel.component.mllp.MllpException; +import org.apache.camel.component.mllp.MllpWriteException; + +import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK; +import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA; +import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK; + +public class MllpBufferedSocketWriter extends MllpSocketWriter { + static final int DEFAULT_SO_SNDBUF = 65535; + + ByteArrayOutputStream outputBuffer; + + public MllpBufferedSocketWriter(Socket socket, boolean acknowledgementWriter) { + super(socket, acknowledgementWriter); + try { + outputBuffer = new ByteArrayOutputStream(socket.getSendBufferSize()); + } catch (SocketException socketEx) { + log.warn(String.format("Ignoring exception encountered retrieving SO_SNDBUF from the socket - using default size of %d bytes", DEFAULT_SO_SNDBUF), socketEx); + outputBuffer = new ByteArrayOutputStream(DEFAULT_SO_SNDBUF); + } + } + + @Override + public void writeEnvelopedPayload(byte[] hl7MessageBytes, byte[] hl7AcknowledgementBytes) throws MllpException { + if (socket == null) { + final String errorMessage = "Socket is null"; + if (isAcknowledgementWriter()) { + throw new MllpAcknowledgementDeliveryException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } else { + throw new MllpWriteException(errorMessage, hl7MessageBytes); + } + } else if (!socket.isConnected()) { + final String errorMessage = "Socket is not connected"; + if (isAcknowledgementWriter()) { + throw new MllpAcknowledgementDeliveryException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } else { + throw new MllpWriteException(errorMessage, hl7MessageBytes); + } + } else if (socket.isClosed()) { + final String errorMessage = "Socket is closed"; + if (isAcknowledgementWriter()) { + throw new MllpAcknowledgementDeliveryException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } else { + throw new MllpWriteException(errorMessage, hl7MessageBytes); + } + } + + OutputStream socketOutputStream = null; + try { + socketOutputStream = socket.getOutputStream(); + } catch (IOException e) { + final String errorMessage = "Failed to retrieve the OutputStream from the Socket"; + if (isAcknowledgementWriter()) { + throw new MllpAcknowledgementDeliveryException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } else { + throw new MllpWriteException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } + } + + outputBuffer.write(START_OF_BLOCK); + + if (isAcknowledgementWriter()) { + if (hl7AcknowledgementBytes == null) { + log.warn("HL7 Acknowledgement payload is null - sending empty MLLP payload"); + } else if (hl7AcknowledgementBytes.length <= 0) { + log.warn("HL7 Acknowledgement payload is empty - sending empty MLLP payload"); + } else { + outputBuffer.write(hl7AcknowledgementBytes, 0, hl7AcknowledgementBytes.length); + } + } else { + if (hl7MessageBytes == null) { + log.warn("HL7 Message payload is null - sending empty MLLP payload"); + } else if (hl7MessageBytes.length <= 0) { + log.warn("HL7 Message payload is empty - sending empty MLLP payload"); + } else { + outputBuffer.write(hl7MessageBytes, 0, hl7MessageBytes.length); + } + } + + outputBuffer.write(END_OF_BLOCK); + outputBuffer.write(END_OF_DATA); + + try { + outputBuffer.writeTo(socketOutputStream); + socketOutputStream.flush(); + } catch (IOException e) { + final String errorMessage = "Failed to write the MLLP payload to the Socket's OutputStream"; + if (isAcknowledgementWriter()) { + throw new MllpAcknowledgementDeliveryException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } else { + throw new MllpWriteException(errorMessage, hl7MessageBytes); + } + } finally { + outputBuffer.reset(); + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketReader.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketReader.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketReader.java new file mode 100644 index 0000000..7d10cce --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketReader.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp.impl; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; + +import org.apache.camel.component.mllp.MllpAcknowledgementTimeoutException; +import org.apache.camel.component.mllp.MllpComponent; +import org.apache.camel.component.mllp.MllpException; +import org.apache.camel.component.mllp.MllpReceiveAcknowledgementException; +import org.apache.camel.component.mllp.MllpReceiveException; +import org.apache.camel.component.mllp.MllpTimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK; + +public class MllpSocketReader { + final Socket socket; + final int receiveTimeout; + final int readTimeout; + final boolean acknowledgementReader; + Logger log = LoggerFactory.getLogger(this.getClass()); + byte[] receiveBuffer; + ByteArrayOutputStream readAdditionalStream; + + public MllpSocketReader(Socket socket, int receiveTimeout, int readTimeout, boolean acknowledgementReader) { + this.socket = socket; + this.receiveTimeout = receiveTimeout; + this.readTimeout = readTimeout; + this.acknowledgementReader = acknowledgementReader; + try { + receiveBuffer = new byte[socket.getReceiveBufferSize()]; + } catch (SocketException socketEx) { + throw new IllegalStateException("Cannot retrieve the value of SO_RCVBUF from the Socket", socketEx); + } + } + + + public byte[] readEnvelopedPayload() throws MllpException { + return readEnvelopedPayload(null, null); + } + + public byte[] readEnvelopedPayload(byte[] hl7MessageBytes) throws MllpException { + return readEnvelopedPayload(null, hl7MessageBytes); + } + + public byte[] readEnvelopedPayload(Integer initialByte) throws MllpException { + return readEnvelopedPayload(initialByte, null); + } + + protected byte[] readEnvelopedPayload(Integer initialByte, byte[] hl7MessageBytes) throws MllpException { + byte[] answer = null; + + MllpSocketUtil.setSoTimeout(socket, receiveTimeout, log, "Preparing to receive payload"); + + InputStream socketInputStream = null; + try { + socketInputStream = socket.getInputStream(); + } catch (IOException ioEx) { + final String errorMessage = "Failed to retrieve the InputStream from the Socket"; + resetConnection(errorMessage); + throw isAcknowledgementReader() + ? new MllpReceiveAcknowledgementException(errorMessage, hl7MessageBytes, ioEx) + : new MllpReceiveException(errorMessage, ioEx); + } + + // Read the acknowledgment - hopefully in one shot + int readCount; + int startPosition = (initialByte != null && initialByte == START_OF_BLOCK) ? 0 : -1; + do { // Read from the socket until the beginning of a MLLP payload is found or a timeout occurs + try { + readCount = socketInputStream.read(receiveBuffer); + if (readCount == -1) { + String errorMessage = "END_OF_STREAM encountered while attempting to receive payload - was Socket closed?"; + resetConnection(errorMessage); + throw isAcknowledgementReader() + ? new MllpReceiveAcknowledgementException(errorMessage, hl7MessageBytes) + : new MllpReceiveException(errorMessage); + } else if (log.isTraceEnabled()) { + log.trace("Received bytes: {}", MllpComponent.covertBytesToPrintFriendlyString(receiveBuffer, 0, readCount)); + } + } catch (SocketTimeoutException timeoutEx) { + if (isAcknowledgementReader()) { + throw new MllpAcknowledgementTimeoutException(hl7MessageBytes, timeoutEx); + } else { + if (initialByte != null && initialByte == START_OF_BLOCK) { + answer = new byte[1]; + answer[0] = initialByte.byteValue(); + throw new MllpTimeoutException(answer, timeoutEx); + } + + return null; + } + } catch (IOException ioEx) { + String errorMessage = "Error receiving payload"; + log.error(errorMessage, ioEx); + resetConnection(errorMessage); + throw isAcknowledgementReader() + ? new MllpReceiveAcknowledgementException(errorMessage, hl7MessageBytes, ioEx) + : new MllpReceiveException(errorMessage, ioEx); + } + + if (readCount > 0) { // If some data was read, make sure we found the beginning of the message + if (initialByte != null && initialByte == START_OF_BLOCK) { + startPosition = 0; + } else { + int startOfBlock = MllpSocketUtil.findStartOfBlock(receiveBuffer, readCount); + startPosition = (startOfBlock == -1) ? -1 : startOfBlock + 1; + } + if (startPosition > 1) { + // Some out-of-band data was received - log it + final String format = "Ignoring {} out-of-band bytes received before the beginning of the payload"; + int length = readCount - startPosition - 1; + if (MllpComponent.isLogPhi()) { + log.warn(format + ": {}", length, MllpComponent.covertBytesToPrintFriendlyString(receiveBuffer, 0, length)); + } else { + log.warn(format, length); + } + } + } + } while (startPosition == -1); + + // Check to see if the payload is complete + int endPosition = MllpSocketUtil.findEndOfMessage(receiveBuffer, readCount); + + if (endPosition != -1) { + // We have a complete payload - build the result without delimiters + if (endPosition < readCount - 3) { + // Some out-of-band data was received - log it + final String format = "Ignoring {} out-of-band bytes received after the end of the payload"; + int length = readCount - endPosition - 2; + if (MllpComponent.isLogPhi()) { + log.warn(format + ": {}", length, MllpComponent.covertBytesToPrintFriendlyString(receiveBuffer, endPosition + 1, length)); + } else { + log.warn(format, length); + } + } + + // Build the answer + int length = endPosition - startPosition; + answer = new byte[length]; + System.arraycopy(receiveBuffer, startPosition, answer, 0, length); + } else { + // The payload is incomplete - read it all before returning + + // Write the data already received to the overflow stream, without the beginning delimiters + getReadAdditionalStream().reset(); + readAdditionalStream.write(receiveBuffer, startPosition, readCount - startPosition); + + // We've already received some data, so switch to the read timeout + MllpSocketUtil.setSoTimeout(socket, readTimeout, log, "Preparing to continue reading payload"); + + // Now the current data is in the overflow stream, continue reading until the end of the payload is found or a timeout occurs + endPosition = -1; + do { // Read from the socket until the end of the MLLP payload is found or a timeout occurs + try { + readCount = socketInputStream.read(receiveBuffer); + if (readCount == -1) { + String errorMessage = "END_OF_STREAM encountered while attempting to read the end of the payload - Socket was closed or reset"; + resetConnection(errorMessage); + byte[] partialPayload = (readAdditionalStream.size() > 0) ? readAdditionalStream.toByteArray() : null; + throw isAcknowledgementReader() + ? new MllpReceiveAcknowledgementException(errorMessage, hl7MessageBytes, partialPayload) + : new MllpReceiveException(errorMessage, partialPayload); + } else if (log.isTraceEnabled()) { + log.trace("Read additional bytes: {}", MllpComponent.covertBytesToPrintFriendlyString(receiveBuffer, 0, readCount)); + } + } catch (SocketTimeoutException timeoutEx) { + String errorMessage = "Timeout reading the end of the payload"; + resetConnection(errorMessage); + byte[] partialPayload = (readAdditionalStream.size() > 0) ? readAdditionalStream.toByteArray() : null; + throw isAcknowledgementReader() + ? new MllpAcknowledgementTimeoutException(errorMessage, hl7MessageBytes, partialPayload, timeoutEx) + : new MllpTimeoutException(errorMessage, partialPayload, timeoutEx); + } catch (IOException ioEx) { + String errorMessage = "Error reading the end of the payload"; + resetConnection(errorMessage); + log.error(errorMessage); + byte[] partialPayload = (readAdditionalStream.size() > 0) ? readAdditionalStream.toByteArray() : null; + throw isAcknowledgementReader() + ? new MllpReceiveAcknowledgementException(errorMessage, hl7MessageBytes, partialPayload, ioEx) + : new MllpReceiveException(errorMessage, partialPayload, ioEx); + } + if (readCount > 0) { // If some data was read, make sure we found the end of the message + endPosition = MllpSocketUtil.findEndOfMessage(receiveBuffer, readCount); + if (endPosition != -1) { + if (endPosition < readCount - 2) { + final String format = "Ignoring {} out-of-band bytes after the end of the payload"; + int length = readCount - endPosition - 2; + if (MllpComponent.isLogPhi()) { + log.warn(format + ": {}", length, MllpComponent.covertBytesToPrintFriendlyString(receiveBuffer, endPosition + 2, length)); + } else { + log.warn(format, length); + } + } + readAdditionalStream.write(receiveBuffer, 0, endPosition); + } else { + readAdditionalStream.write(receiveBuffer, 0, readCount); + } + } + } while (endPosition == -1); + + // All available data has been read - return the data + answer = readAdditionalStream.toByteArray(); + } + + // Check to see if there is any more data available + int availableCount; + do { + try { + availableCount = socketInputStream.available(); + } catch (IOException ioEx) { + log.warn("Ignoring IOException encountered while checking for additional available trailing bytes", ioEx); + break; + } + if (availableCount > 0) { // if data is available, eat it + try { + readCount = socketInputStream.read(receiveBuffer); + final String format = "Ignoring {} out-of-band bytes trailing after the end of the payload"; + if (MllpComponent.isLogPhi()) { + log.warn(format + ": {}", readCount, MllpComponent.covertBytesToPrintFriendlyString(receiveBuffer, 0, readCount)); + } else { + log.warn(format, readCount); + } + } catch (IOException ioEx) { + log.warn(String.format("Ignoring IOException encountered while attempting to read %d bytes of trailing data", availableCount), ioEx); + break; + } + } + } while (availableCount != 0); + + return answer; + } + + public void closeConnection(String reasonMessage) { + MllpSocketUtil.close(socket, log, reasonMessage); + } + + public void resetConnection(String reasonMessage) { + MllpSocketUtil.reset(socket, log, reasonMessage); + } + + public Socket getSocket() { + return socket; + } + + public int getReceiveTimeout() { + return receiveTimeout; + } + + public int getReadTimeout() { + return readTimeout; + } + + public byte[] getReceiveBuffer() { + return receiveBuffer; + } + + public boolean isAcknowledgementReader() { + return acknowledgementReader; + } + + public ByteArrayOutputStream getReadAdditionalStream() { + if (readAdditionalStream == null) { + readAdditionalStream = new ByteArrayOutputStream(receiveBuffer.length); + } + + return readAdditionalStream; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketUtil.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketUtil.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketUtil.java new file mode 100644 index 0000000..fab33a8 --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketUtil.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp.impl; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK; +import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA; +import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK; + +public final class MllpSocketUtil { + private static final Logger LOG = LoggerFactory.getLogger(MllpSocketUtil.class); + + private MllpSocketUtil() { + } + + public static void setSoTimeout(Socket socket, int timeout, Logger logger, String reasonMessage) { + if (logger != null && logger.isDebugEnabled()) { + final String format = "Setting SO_TIMEOUT to {} for connection {}"; + if (reasonMessage != null && !reasonMessage.isEmpty()) { + logger.debug(format + " Reason: {}", timeout, getAddressString(socket), reasonMessage); + } else { + logger.debug(format, timeout, getAddressString(socket)); + } + } + try { + socket.setSoTimeout(timeout); + } catch (SocketException socketEx) { + if (logger != null) { + final String format = "Ignoring SocketException encountered setting SO_TIMEOUT to %d for connection %s."; + if (reasonMessage != null && !reasonMessage.isEmpty()) { + logger.warn(String.format(format + " Reason: %s", timeout, getAddressString(socket), reasonMessage), socketEx); + } else { + logger.warn(String.format(format, timeout, getAddressString(socket)), socketEx); + } + } + } + } + + public static void close(Socket socket, Logger logger, String reasonMessage) { + if (socket != null && socket.isConnected() && !socket.isClosed()) { + final String format = "Closing connection {}"; + + String address = getAddressString(socket); + + if (logger != null) { + if (reasonMessage != null && !reasonMessage.isEmpty()) { + logger.warn(format + ". Reason: {}", address, reasonMessage); + } else { + logger.warn(format, address); + } + } + + if (!socket.isInputShutdown()) { + try { + socket.shutdownInput(); + } catch (Exception ex) { + String logMessage = String.format("Ignoring Exception encountered shutting down the input stream on the client socket %s", address); + if (logger != null) { + logger.warn(logMessage, ex); + } else { + LOG.warn(logMessage, ex); + } + } + } + + if (!socket.isOutputShutdown()) { + try { + socket.shutdownOutput(); + } catch (Exception ex) { + String logMessage = String.format("Ignoring Exception encountered shutting down the output stream on the client socket %s", address); + if (logger != null) { + logger.warn(logMessage, ex); + } else { + LOG.warn(logMessage, ex); + } + } + } + + try { + socket.close(); + } catch (IOException ioEx) { + String logMessage = String.format("Ignoring IOException encountered while closing connection %s", address); + if (logger != null) { + logger.warn(logMessage, ioEx); + } else { + LOG.warn(logMessage, ioEx); + } + } + } + } + + public static void reset(Socket socket, Logger logger, String reasonMessage) { + if (socket != null && socket.isConnected() && !socket.isClosed()) { + final String format = "Resetting connection {}"; + + String address = getAddressString(socket); + + if (logger != null) { + if (reasonMessage != null && !reasonMessage.isEmpty()) { + logger.warn(format + ". Reason: {}", address, reasonMessage); + } else { + logger.warn(format, address); + } + } + + try { + socket.setSoLinger(true, 0); + } catch (SocketException socketEx) { + String logMessage = String.format("Ignoring SocketException encountered setting SO_LINGER in preparation for resetting connection %s", address); + if (logger != null) { + logger.warn(logMessage, socketEx); + } else { + LOG.warn(logMessage, socketEx); + } + } + + try { + socket.close(); + } catch (IOException ioEx) { + String logMessage = String.format("Ignoring IOException encountered while resetting connection %s", address); + if (logger != null) { + logger.warn(logMessage, ioEx); + } else { + LOG.warn(logMessage, ioEx); + } + } + } + } + + public static String getAddressString(Socket socket) { + String localAddressString = "null"; + String remoteAddressString = "null"; + + if (socket != null) { + SocketAddress localSocketAddress = socket.getLocalSocketAddress(); + if (localSocketAddress != null) { + localAddressString = localSocketAddress.toString(); + } + + SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress(); + if (remoteSocketAddress != null) { + remoteAddressString = remoteSocketAddress.toString(); + } + } + + return String.format("%s -> %s", localAddressString, remoteAddressString); + } + + public static int findStartOfBlock(byte[] payload) { + if (payload != null) { + return findStartOfBlock(payload, payload.length); + } + + return -1; + } + + /** + * Find the beginning of the HL7 Payload + * <p> + * Searches the payload from the beginning, looking for the START_OF_BLOCK character. + * + * @param payload the payload to check + * @param length the current valid length of the receive buffer + * @return the index of the START_OF_BLOCK, or -1 if not found + */ + public static int findStartOfBlock(byte[] payload, int length) { + if (payload != null && length >= 0) { + for (int i = 0; i < Math.min(length, payload.length); ++i) { + if (payload[i] == START_OF_BLOCK) { + return i; + } + } + } + + return -1; + } + + public static int findEndOfMessage(byte[] payload) { + if (payload != null) { + return findEndOfMessage(payload, payload.length); + } + + return -1; + } + + /** + * Find the end of the HL7 Payload + * <p> + * Searches the payload from the end, looking for the [END_OF_BLOCK, END_OF_DATA] characters. + * + * @param payload the payload to check + * @param length the current valid length of the receive buffer + * @return the index of the END_OF_BLOCK character that terminates the message, or -1 if not found + */ + public static int findEndOfMessage(byte[] payload, int length) { + if (payload != null && length >= 0) { + for (int i = Math.min(length, payload.length) - 1; i > 0; --i) { + if (payload[i] == END_OF_DATA) { + if (i > 0 && payload[i - 1] == END_OF_BLOCK) { + return i - 1; + } + } + } + } + + return -1; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketWriter.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketWriter.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketWriter.java new file mode 100644 index 0000000..bd9515c --- /dev/null +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketWriter.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mllp.impl; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; + +import org.apache.camel.component.mllp.MllpAcknowledgementDeliveryException; +import org.apache.camel.component.mllp.MllpException; +import org.apache.camel.component.mllp.MllpWriteException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK; +import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA; +import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK; + +public class MllpSocketWriter { + static final byte[] PAYLOAD_TERMINATOR; + + static { + PAYLOAD_TERMINATOR = new byte[2]; + PAYLOAD_TERMINATOR[0] = END_OF_BLOCK; + PAYLOAD_TERMINATOR[1] = END_OF_DATA; + } + + final Socket socket; + final boolean acknowledgementWriter; + + Logger log = LoggerFactory.getLogger(this.getClass()); + + public MllpSocketWriter(Socket socket, boolean acknowledgementWriter) { + this.socket = socket; + this.acknowledgementWriter = acknowledgementWriter; + } + + public void writeEnvelopedPayload(byte[] hl7MessageBytes, byte[] hl7AcknowledgementBytes) throws MllpException { + if (socket == null) { + final String errorMessage = "Socket is null"; + if (isAcknowledgementWriter()) { + throw new MllpAcknowledgementDeliveryException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } else { + throw new MllpWriteException(errorMessage, hl7MessageBytes); + } + } else if (!socket.isConnected()) { + final String errorMessage = "Socket is not connected"; + if (isAcknowledgementWriter()) { + throw new MllpAcknowledgementDeliveryException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } else { + throw new MllpWriteException(errorMessage, hl7MessageBytes); + } + } else if (socket.isClosed()) { + final String errorMessage = "Socket is closed"; + if (isAcknowledgementWriter()) { + throw new MllpAcknowledgementDeliveryException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } else { + throw new MllpWriteException(errorMessage, hl7MessageBytes); + } + } + + OutputStream socketOutputStream = null; + try { + socketOutputStream = socket.getOutputStream(); + } catch (IOException e) { + final String errorMessage = "Failed to retrieve the OutputStream from the Socket"; + if (isAcknowledgementWriter()) { + throw new MllpAcknowledgementDeliveryException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } else { + throw new MllpWriteException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } + } + + try { + socketOutputStream.write(START_OF_BLOCK); + } catch (IOException e) { + final String errorMessage = "Failed to write the START_OF_BLOCK to the Socket's OutputStream"; + if (isAcknowledgementWriter()) { + throw new MllpAcknowledgementDeliveryException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } else { + throw new MllpWriteException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } + } + + if (isAcknowledgementWriter()) { + if (hl7AcknowledgementBytes == null) { + log.warn("HL7 Acknowledgement payload is null - sending empty MLLP payload"); + } else if (hl7AcknowledgementBytes.length <= 0) { + log.warn("HL7 Acknowledgement payload is empty - sending empty MLLP payload"); + } else { + try { + socketOutputStream.write(hl7AcknowledgementBytes); + } catch (IOException ioEx) { + throw new MllpAcknowledgementDeliveryException("Failed to write the HL7 Acknowledgement payload to the Socket's OutputStream", hl7MessageBytes, hl7AcknowledgementBytes, ioEx); + } + } + } else { + if (hl7MessageBytes == null) { + log.warn("HL7 Message payload is null - sending empty MLLP payload"); + } else if (hl7MessageBytes.length <= 0) { + log.warn("HL7 Message payload is empty - sending empty MLLP payload"); + } else { + try { + socketOutputStream.write(hl7MessageBytes); + } catch (IOException ioEx) { + throw new MllpWriteException("Failed to write the HL7 Message payload to the Socket's OutputStream", hl7MessageBytes, hl7AcknowledgementBytes, ioEx); + } + } + } + + try { + socketOutputStream.write(PAYLOAD_TERMINATOR); + socketOutputStream.flush(); + } catch (IOException e) { + final String errorMessage = "Failed to write the END_OF_BLOCK and END_OF_DATA to the Socket's OutputStream"; + if (isAcknowledgementWriter()) { + throw new MllpAcknowledgementDeliveryException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } else { + throw new MllpWriteException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); + } + } + + } + + public boolean isAcknowledgementWriter() { + return acknowledgementWriter; + } +}