Repository: camel Updated Branches: refs/heads/camel-2.18.x 0ed067b1e -> ba8f3aa04 refs/heads/master 911218438 -> 58e36d63b
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java index fe1ad40..880761c 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java @@ -19,6 +19,7 @@ package org.apache.camel.test.junit.rule.mllp; import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.BindException; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -45,8 +46,6 @@ import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK; * * The server can be configured to simulate a large number * of error conditions. - * - * TODO: This needs to be looked at - it may be orphaning threads */ public class MllpServerResource extends ExternalResource { Logger log = LoggerFactory.getLogger(this.getClass()); @@ -59,6 +58,12 @@ public class MllpServerResource extends ExternalResource { boolean active = true; + int delayBeforeStartOfBlock; + int delayBeforeAcknowledgement; + int delayDuringAcknowledgement; + int delayAfterAcknowledgement; + int delayAfterEndOfBlock; + int excludeStartOfBlockModulus; int excludeEndOfBlockModulus; int excludeEndOfDataModulus; @@ -67,8 +72,11 @@ public class MllpServerResource extends ExternalResource { int sendOutOfBandDataModulus; - int disconnectBeforeAcknowledgementModulus; - int disconnectAfterAcknowledgementModulus; + int closeSocketBeforeAcknowledgementModulus; + int closeSocketAfterAcknowledgementModulus; + + int resetSocketBeforeAcknowledgementModulus; + int resetSocketAfterAcknowledgementModulus; int sendApplicationRejectAcknowledgementModulus; int sendApplicationErrorAcknowledgementModulus; @@ -76,6 +84,8 @@ public class MllpServerResource extends ExternalResource { Pattern sendApplicationRejectAcknowledgementPattern; Pattern sendApplicationErrorAcknowledgementPattern; + String acknowledgementString; + ServerSocketThread serverSocketThread; public MllpServerResource() { @@ -166,6 +176,46 @@ public class MllpServerResource extends ExternalResource { serverSocketThread.interrupt(); } + public int getDelayBeforeStartOfBlock() { + return delayBeforeStartOfBlock; + } + + public void setDelayBeforeStartOfBlock(int delayBeforeStartOfBlock) { + this.delayBeforeStartOfBlock = delayBeforeStartOfBlock; + } + + public int getDelayBeforeAcknowledgement() { + return delayBeforeAcknowledgement; + } + + public void setDelayBeforeAcknowledgement(int delayBeforeAcknowledgement) { + this.delayBeforeAcknowledgement = delayBeforeAcknowledgement; + } + + public int getDelayDuringAcknowledgement() { + return delayDuringAcknowledgement; + } + + public void setDelayDuringAcknowledgement(int delayDuringAcknowledgement) { + this.delayDuringAcknowledgement = delayDuringAcknowledgement; + } + + public int getDelayAfterAcknowledgement() { + return delayAfterAcknowledgement; + } + + public void setDelayAfterAcknowledgement(int delayAfterAcknowledgement) { + this.delayAfterAcknowledgement = delayAfterAcknowledgement; + } + + public int getDelayAfterEndOfBlock() { + return delayAfterEndOfBlock; + } + + public void setDelayAfterEndOfBlock(int delayAfterEndOfBlock) { + this.delayAfterEndOfBlock = delayAfterEndOfBlock; + } + public boolean sendApplicationRejectAcknowledgement(String hl7Message) { return evaluatePatten(hl7Message, this.sendApplicationErrorAcknowledgementPattern); } @@ -198,12 +248,20 @@ public class MllpServerResource extends ExternalResource { return evaluateModulus(messageCount, excludeEndOfDataModulus); } - public boolean disconnectBeforeAcknowledgement(int messageCount) { - return evaluateModulus(messageCount, disconnectBeforeAcknowledgementModulus); + public boolean closeSocketBeforeAcknowledgement(int messageCount) { + return evaluateModulus(messageCount, closeSocketBeforeAcknowledgementModulus); } - public boolean disconnectAfterAcknowledgement(int messageCount) { - return evaluateModulus(messageCount, disconnectAfterAcknowledgementModulus); + public boolean closeSocketAfterAcknowledgement(int messageCount) { + return evaluateModulus(messageCount, closeSocketAfterAcknowledgementModulus); + } + + public boolean resetSocketBeforeAcknowledgement(int messageCount) { + return evaluateModulus(messageCount, resetSocketBeforeAcknowledgementModulus); + } + + public boolean resetSocketAfterAcknowledgement(int messageCount) { + return evaluateModulus(messageCount, resetSocketAfterAcknowledgementModulus); } public boolean sendOutOfBandData(int messageCount) { @@ -369,27 +427,51 @@ public class MllpServerResource extends ExternalResource { } } - public int getDisconnectBeforeAcknowledgementModulus() { - return disconnectBeforeAcknowledgementModulus; + public int getCloseSocketBeforeAcknowledgementModulus() { + return closeSocketBeforeAcknowledgementModulus; + } + + public void setCloseSocketBeforeAcknowledgementModulus(int closeSocketBeforeAcknowledgementModulus) { + if (0 > closeSocketBeforeAcknowledgementModulus) { + this.closeSocketBeforeAcknowledgementModulus = 0; + } else { + this.closeSocketBeforeAcknowledgementModulus = closeSocketBeforeAcknowledgementModulus; + } + } + + public int getCloseSocketAfterAcknowledgementModulus() { + return closeSocketAfterAcknowledgementModulus; + } + + public void setCloseSocketAfterAcknowledgementModulus(int closeSocketAfterAcknowledgementModulus) { + if (0 > closeSocketAfterAcknowledgementModulus) { + this.closeSocketAfterAcknowledgementModulus = 0; + } else { + this.closeSocketAfterAcknowledgementModulus = closeSocketAfterAcknowledgementModulus; + } + } + + public int getResetSocketBeforeAcknowledgementModulus() { + return resetSocketBeforeAcknowledgementModulus; } - public void setDisconnectBeforeAcknowledgementModulus(int disconnectBeforeAcknowledgementModulus) { - if (0 > disconnectBeforeAcknowledgementModulus) { - this.disconnectBeforeAcknowledgementModulus = 0; + public void setResetSocketBeforeAcknowledgementModulus(int resetSocketBeforeAcknowledgementModulus) { + if (0 > resetSocketBeforeAcknowledgementModulus) { + this.resetSocketBeforeAcknowledgementModulus = 0; } else { - this.disconnectBeforeAcknowledgementModulus = disconnectBeforeAcknowledgementModulus; + this.resetSocketBeforeAcknowledgementModulus = resetSocketBeforeAcknowledgementModulus; } } - public int getDisconnectAfterAcknowledgementModulus() { - return disconnectAfterAcknowledgementModulus; + public int getResetSocketAfterAcknowledgementModulus() { + return resetSocketAfterAcknowledgementModulus; } - public void setDisconnectAfterAcknowledgementModulus(int disconnectAfterAcknowledgementModulus) { - if (0 > disconnectAfterAcknowledgementModulus) { - this.disconnectAfterAcknowledgementModulus = 0; + public void setResetSocketAfterAcknowledgementModulus(int resetSocketAfterAcknowledgementModulus) { + if (0 > resetSocketAfterAcknowledgementModulus) { + this.resetSocketAfterAcknowledgementModulus = 0; } else { - this.disconnectAfterAcknowledgementModulus = disconnectAfterAcknowledgementModulus; + this.resetSocketAfterAcknowledgementModulus = resetSocketAfterAcknowledgementModulus; } } @@ -433,6 +515,14 @@ public class MllpServerResource extends ExternalResource { this.sendApplicationErrorAcknowledgementPattern = sendApplicationErrorAcknowledgementPattern; } + public String getAcknowledgementString() { + return acknowledgementString; + } + + public void setAcknowledgementString(String acknowledgementString) { + this.acknowledgementString = acknowledgementString; + } + public ServerSocketThread getServerSocketThread() { return serverSocketThread; } @@ -441,41 +531,95 @@ public class MllpServerResource extends ExternalResource { this.serverSocketThread = serverSocketThread; } - void closeConnection(Socket socket) { - if (null != socket) { - if (!socket.isClosed()) { - try { - socket.shutdownInput(); - } catch (Exception ex) { - log.warn("Exception encountered shutting down the input stream on the client socket", ex); - } - - try { - socket.shutdownOutput(); - } catch (Exception ex) { - log.warn("Exception encountered shutting down the output stream on the client socket", ex); - } + public void closeClientConnections() { + if (serverSocketThread != null) { + serverSocketThread.closeClientConnections(); + } + } - try { - socket.close(); - } catch (Exception ex) { - log.warn("Exception encountered closing the client socket", ex); - } - } + public void resetClientConnections() { + if (serverSocketThread != null) { + serverSocketThread.resetClientConnections(); } } - void resetConnection(Socket socket) { - if (null != socket && !socket.isClosed()) { - try { - socket.setSoLinger(true, 0); - } catch (SocketException socketEx) { - log.debug("SocketException encountered setting SO_LINGER to 0 on the socket to force a reset - ignoring", socketEx); - } finally { - closeConnection(socket); + /** + * Generates a HL7 Application Acknowledgement + * + * @param hl7Message HL7 message that is being acknowledged + * @param acknowledgementCode AA, AE or AR + * @return a HL7 Application Acknowledgement + */ + protected String generateAcknowledgement(String hl7Message, String acknowledgementCode) { + final String defaulNackMessage = + "MSH|^~\\&|||||||NACK||P|2.2" + SEGMENT_DELIMITER + + "MSA|AR|" + SEGMENT_DELIMITER + + MESSAGE_TERMINATOR; + + if (hl7Message == null) { + log.error("Invalid HL7 message for parsing operation. Please check your inputs"); + return defaulNackMessage; + } + + if (!("AA".equals(acknowledgementCode) || "AE".equals(acknowledgementCode) || "AR".equals(acknowledgementCode))) { + throw new IllegalArgumentException("Acknowledgemnt Code must be AA, AE or AR: " + acknowledgementCode); + } + + String messageControlId; + + int endOfMshSegment = hl7Message.indexOf(SEGMENT_DELIMITER); + if (-1 != endOfMshSegment) { + String mshSegment = hl7Message.substring(0, endOfMshSegment); + char fieldSeparator = mshSegment.charAt(3); + String fieldSeparatorPattern = Pattern.quote("" + fieldSeparator); + String[] mshFields = mshSegment.split(fieldSeparatorPattern); + if (mshFields.length == 0) { + log.error("Failed to split MSH Segment into fields"); + } else { + StringBuilder ackBuilder = new StringBuilder(mshSegment.length() + 25); + // Build the MSH Segment + ackBuilder + .append(mshFields[0]).append(fieldSeparator) + .append(mshFields[1]).append(fieldSeparator) + .append(mshFields[4]).append(fieldSeparator) + .append(mshFields[5]).append(fieldSeparator) + .append(mshFields[2]).append(fieldSeparator) + .append(mshFields[3]).append(fieldSeparator) + .append(mshFields[6]).append(fieldSeparator) + .append(mshFields[7]).append(fieldSeparator) + .append("ACK") + .append(mshFields[8].substring(3)); + for (int i = 9; i < mshFields.length; ++i) { + ackBuilder.append(fieldSeparator).append(mshFields[i]); + } + // Empty fields at the end are not preserved by String.split, so preserve them + int emptyFieldIndex = mshSegment.length() - 1; + if (fieldSeparator == mshSegment.charAt(mshSegment.length() - 1)) { + ackBuilder.append(fieldSeparator); + while (emptyFieldIndex >= 1 && mshSegment.charAt(emptyFieldIndex) == mshSegment.charAt(emptyFieldIndex - 1)) { + ackBuilder.append(fieldSeparator); + --emptyFieldIndex; + } + } + ackBuilder.append(SEGMENT_DELIMITER); + + // Build the MSA Segment + ackBuilder + .append("MSA").append(fieldSeparator) + .append(acknowledgementCode).append(fieldSeparator) + .append(mshFields[9]).append(fieldSeparator) + .append(SEGMENT_DELIMITER); + + // Terminate the message + ackBuilder.append(MESSAGE_TERMINATOR); + + return ackBuilder.toString(); } + } else { + log.error("Failed to find the end of the MSH Segment"); } + return null; } /** @@ -566,6 +710,22 @@ public class MllpServerResource extends ExternalResource { log.info("Opened TCP Listener on port {}", serverSocket.getLocalPort()); } + void closeClientConnections() { + if (clientSocketThreads != null) { + for (ClientSocketThread clientSocketThread : clientSocketThreads) { + clientSocketThread.closeConnection(); + } + } + } + + void resetClientConnections() { + if (clientSocketThreads != null) { + for (ClientSocketThread clientSocketThread : clientSocketThreads) { + clientSocketThread.resetConnection(); + } + } + } + /** * Accept TCP connections and create ClientSocketThreads for them */ @@ -586,7 +746,16 @@ public class MllpServerResource extends ExternalResource { if (null == clientSocket) { continue; } else if (!clientSocket.isClosed()) { - resetConnection(clientSocket); + try { + clientSocket.setSoLinger(true, 0); + } catch (SocketException soLingerEx) { + log.warn("Ignoring SocketException encountered when setting SO_LINGER in preparation of resetting client Socket", soLingerEx); + } + try { + clientSocket.close(); + } catch (IOException ioEx) { + log.warn("Ignoring IOException encountered when resetting client Socket", ioEx); + } continue; } else { throw new MllpJUnitResourceException("Unexpected SocketException encountered accepting client connection", socketEx); @@ -685,6 +854,10 @@ public class MllpServerResource extends ExternalResource { } super.interrupt(); } + + public void close() { + + } } /** @@ -711,6 +884,35 @@ public class MllpServerResource extends ExternalResource { this.clientSocket = clientSocket; } + void closeConnection() { + if (clientSocket != null && !clientSocket.isClosed()) { + try { + clientSocket.close(); + } catch (IOException ioEx) { + log.warn("Ignoring IOException encountered when closing client Socket", ioEx); + } finally { + clientSocket = null; + } + } + } + + void resetConnection() { + if (clientSocket != null && !clientSocket.isClosed()) { + try { + clientSocket.setSoLinger(true, 0); + } catch (SocketException socketEx) { + log.warn("Ignoring SocketException encountered when setting SO_LINGER in preparation of resetting client Socket", socketEx); + } + try { + clientSocket.close(); + } catch (IOException ioEx) { + log.warn("Ignoring IOException encountered when resetting client Socket", ioEx); + } finally { + clientSocket = null; + } + } + } + /** * Receives HL7 messages and replies with HL7 Acknowledgements. * @@ -738,12 +940,30 @@ public class MllpServerResource extends ExternalResource { } catch (Exception unexpectedEx) { throw new MllpJUnitResourceException("Unexpected exception encounted getting input stream", unexpectedEx); } - String parsedHL7Message = getMessage(instream); + String parsedHL7Message; + try { + parsedHL7Message = getMessage(instream); + } catch (SocketTimeoutException timeoutEx) { + log.info("Waiting for message from client"); + continue; + } if (null != parsedHL7Message && parsedHL7Message.length() > 0) { ++messageCounter; - if (disconnectBeforeAcknowledgement(messageCounter)) { - log.warn("Disconnecting before sending acknowledgement"); + if (closeSocketBeforeAcknowledgement(messageCounter)) { + log.warn("Closing socket before sending acknowledgement"); + clientSocket.shutdownInput(); + clientSocket.shutdownOutput(); + clientSocket.close(); + break; + } + if (resetSocketBeforeAcknowledgement(messageCounter)) { + log.warn("Resetting socket before sending acknowledgement"); + try { + clientSocket.setSoLinger(true, 0); + } catch (IOException ioEx) { + log.warn("Ignoring IOException encountered setting SO_LINGER when prepareing to reset socket", ioEx); + } clientSocket.shutdownInput(); clientSocket.shutdownOutput(); clientSocket.close(); @@ -752,39 +972,67 @@ public class MllpServerResource extends ExternalResource { String acknowledgmentMessage; - if (sendApplicationErrorAcknowledgement(messageCounter) || sendApplicationErrorAcknowledgement(parsedHL7Message)) { - acknowledgmentMessage = generateAcknowledgementMessage(parsedHL7Message, "AE"); - } else if (sendApplicationRejectAcknowledgement(messageCounter) || sendApplicationRejectAcknowledgement(parsedHL7Message)) { - acknowledgmentMessage = generateAcknowledgementMessage(parsedHL7Message, "AR"); + if (acknowledgementString == null) { + if (sendApplicationErrorAcknowledgement(messageCounter) || sendApplicationErrorAcknowledgement(parsedHL7Message)) { + acknowledgmentMessage = generateAcknowledgementMessage(parsedHL7Message, "AE"); + } else if (sendApplicationRejectAcknowledgement(messageCounter) || sendApplicationRejectAcknowledgement(parsedHL7Message)) { + acknowledgmentMessage = generateAcknowledgementMessage(parsedHL7Message, "AR"); + } else { + acknowledgmentMessage = generateAcknowledgementMessage(parsedHL7Message); + } } else { - acknowledgmentMessage = generateAcknowledgementMessage(parsedHL7Message); - + acknowledgmentMessage = acknowledgementString; } + BufferedOutputStream outstream = new BufferedOutputStream(clientSocket.getOutputStream()); if (sendOutOfBandData(messageCounter)) { byte[] outOfBandDataBytes = "Out Of Band Hl7MessageGenerator".getBytes(); outstream.write(outOfBandDataBytes, 0, outOfBandDataBytes.length); - } + if (excludeStartOfBlock(messageCounter)) { - log.warn("NOT sending bMLLP_ENVELOPE_START_OF_BLOCK"); + log.warn("NOT sending START_OF_BLOCK"); } else { outstream.write(START_OF_BLOCK); + if (delayBeforeStartOfBlock > 0) { + uncheckedSleep(delayBeforeStartOfBlock); + uncheckedFlush(outstream); + } } if (excludeAcknowledgement(messageCounter)) { log.info("NOT sending Acknowledgement body"); } else { + if (delayBeforeAcknowledgement > 0) { + uncheckedSleep(delayBeforeAcknowledgement); + } log.debug("Buffering Acknowledgement\n\t{}", acknowledgmentMessage.replace('\r', '\n')); byte[] ackBytes = acknowledgmentMessage.getBytes(); - outstream.write(ackBytes, 0, ackBytes.length); + if (delayDuringAcknowledgement > 0) { + int firstHalf = ackBytes.length / 2; + outstream.write(ackBytes, 0, firstHalf); + uncheckedFlush(outstream); + uncheckedSleep(delayDuringAcknowledgement); + outstream.write(ackBytes, firstHalf, ackBytes.length - firstHalf); + uncheckedFlush(outstream); + } else { + outstream.write(ackBytes, 0, ackBytes.length); + } + if (delayAfterAcknowledgement > 0) { + uncheckedFlush(outstream); + uncheckedSleep(delayAfterAcknowledgement); + } } if (excludeEndOfBlock(messageCounter)) { log.warn("NOT sending bMLLP_ENVELOPE_END_OF_BLOCK"); } else { outstream.write(END_OF_BLOCK); + if (delayAfterEndOfBlock > 0) { + uncheckedFlush(outstream); + uncheckedSleep(delayAfterEndOfBlock); + } } if (excludeEndOfData(messageCounter)) { @@ -794,9 +1042,9 @@ public class MllpServerResource extends ExternalResource { } log.debug("Writing Acknowledgement\n\t{}", acknowledgmentMessage.replace('\r', '\n')); - outstream.flush(); + uncheckedFlush(outstream); - if (disconnectAfterAcknowledgement(messageCounter)) { + if (closeSocketAfterAcknowledgement(messageCounter)) { log.info("Closing Client"); clientSocket.shutdownInput(); clientSocket.shutdownOutput(); @@ -806,16 +1054,18 @@ public class MllpServerResource extends ExternalResource { } } } catch (IOException e) { - String errorMessage = "Error whiling reading and writing to clientSocket"; + String errorMessage = "Error while reading and writing from clientSocket"; log.error(errorMessage, e); throw new MllpJUnitResourceException(errorMessage, e); } finally { - try { - clientSocket.close(); - } catch (IOException e) { - String errorMessage = "Error whiling attempting to close to client Socket"; - log.error(errorMessage, e); - throw new MllpJUnitResourceException(errorMessage, e); + if (clientSocket != null) { + try { + clientSocket.close(); + } catch (IOException e) { + String errorMessage = "Error while attempting to close to client Socket"; + log.error(errorMessage, e); + throw new MllpJUnitResourceException(errorMessage, e); + } } } @@ -829,10 +1079,8 @@ public class MllpServerResource extends ExternalResource { * @return the MLLP payload * @throws IOException when the underlying Java Socket calls raise these exceptions */ - // TODO: Enhance this to detect non-HL7 data (i.e. look for MSH after START_OF_BLOCK) public String getMessage(InputStream anInputStream) throws IOException { try { - // TODO: Enhance this to read a bunch of characters and log, rather than log them one at a time boolean waitingForStartOfBlock = true; while (waitingForStartOfBlock) { int potentialStartCharacter = anInputStream.read(); @@ -847,14 +1095,16 @@ public class MllpServerResource extends ExternalResource { } } } catch (SocketException socketEx) { - if (clientSocket.isClosed()) { - log.info("Client socket closed while waiting for MLLP_ENVELOPE_START_OF_BLOCK"); - } else if (clientSocket.isConnected()) { - log.info("SocketException encountered while waiting for MLLP_ENVELOPE_START_OF_BLOCK"); - resetConnection(clientSocket); - } else { - log.error("Unable to read from socket stream when expected bMLLP_ENVELOPE_START_OF_BLOCK - resetting connection ", socketEx); - resetConnection(clientSocket); + if (clientSocket != null) { + if (clientSocket.isClosed()) { + log.info("Client socket closed while waiting for MLLP_ENVELOPE_START_OF_BLOCK"); + } else if (clientSocket.isConnected()) { + log.info("SocketException encountered while waiting for MLLP_ENVELOPE_START_OF_BLOCK"); + resetConnection(); + } else { + log.error("Unable to read from socket stream when expected bMLLP_ENVELOPE_START_OF_BLOCK - resetting connection ", socketEx); + resetConnection(); + } } return null; } @@ -907,75 +1157,7 @@ public class MllpServerResource extends ExternalResource { * @return a HL7 Application Acknowledgement */ private String generateAcknowledgementMessage(String hl7Message, String acknowledgementCode) { - final String defaulNackMessage = - "MSH|^~\\&|||||||NACK||P|2.2" + SEGMENT_DELIMITER - + "MSA|AR|" + SEGMENT_DELIMITER - + MESSAGE_TERMINATOR; - - if (hl7Message == null) { - log.error("Invalid HL7 message for parsing operation. Please check your inputs"); - return defaulNackMessage; - } - - if (!("AA".equals(acknowledgementCode) || "AE".equals(acknowledgementCode) || "AR".equals(acknowledgementCode))) { - throw new IllegalArgumentException("Acknowledgemnt Code must be AA, AE or AR: " + acknowledgementCode); - } - - String messageControlId; - - int endOfMshSegment = hl7Message.indexOf(SEGMENT_DELIMITER); - if (-1 != endOfMshSegment) { - String mshSegment = hl7Message.substring(0, endOfMshSegment); - char fieldSeparator = mshSegment.charAt(3); - String fieldSeparatorPattern = Pattern.quote("" + fieldSeparator); - String[] mshFields = mshSegment.split(fieldSeparatorPattern); - if (mshFields.length == 0) { - log.error("Failed to split MSH Segment into fields"); - } else { - StringBuilder ackBuilder = new StringBuilder(mshSegment.length() + 25); - // Build the MSH Segment - ackBuilder - .append(mshFields[0]).append(fieldSeparator) - .append(mshFields[1]).append(fieldSeparator) - .append(mshFields[4]).append(fieldSeparator) - .append(mshFields[5]).append(fieldSeparator) - .append(mshFields[2]).append(fieldSeparator) - .append(mshFields[3]).append(fieldSeparator) - .append(mshFields[6]).append(fieldSeparator) - .append(mshFields[7]).append(fieldSeparator) - .append("ACK") - .append(mshFields[8].substring(3)); - for (int i = 9; i < mshFields.length; ++i) { - ackBuilder.append(fieldSeparator).append(mshFields[i]); - } - // Empty fields at the end are not preserved by String.split, so preserve them - int emptyFieldIndex = mshSegment.length() - 1; - if (fieldSeparator == mshSegment.charAt(mshSegment.length() - 1)) { - ackBuilder.append(fieldSeparator); - while (emptyFieldIndex >= 1 && mshSegment.charAt(emptyFieldIndex) == mshSegment.charAt(emptyFieldIndex - 1)) { - ackBuilder.append(fieldSeparator); - --emptyFieldIndex; - } - } - ackBuilder.append(SEGMENT_DELIMITER); - - // Build the MSA Segment - ackBuilder - .append("MSA").append(fieldSeparator) - .append(acknowledgementCode).append(fieldSeparator) - .append(mshFields[9]).append(fieldSeparator) - .append(SEGMENT_DELIMITER); - - // Terminate the message - ackBuilder.append(MESSAGE_TERMINATOR); - - return ackBuilder.toString(); - } - } else { - log.error("Failed to find the end of the MSH Segment"); - } - - return null; + return generateAcknowledgement(hl7Message, acknowledgementCode); } @Override @@ -989,6 +1171,23 @@ public class MllpServerResource extends ExternalResource { } super.interrupt(); } + + private void uncheckedSleep(long milliseconds) { + try { + Thread.sleep(milliseconds); + } catch (InterruptedException e) { + log.warn("Sleep interrupted", e); + } + + } + + private void uncheckedFlush(OutputStream outputStream) { + try { + outputStream.flush(); + } catch (IOException e) { + log.warn("Ignoring exception caught while flushing output stream", e); + } + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/test/java/org/apache/camel/test/util/PayloadBuilder.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/java/org/apache/camel/test/util/PayloadBuilder.java b/components/camel-mllp/src/test/java/org/apache/camel/test/util/PayloadBuilder.java new file mode 100644 index 0000000..3959b43 --- /dev/null +++ b/components/camel-mllp/src/test/java/org/apache/camel/test/util/PayloadBuilder.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.test.util; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class PayloadBuilder { + ByteArrayOutputStream builderStream = new ByteArrayOutputStream(); + + public PayloadBuilder() { + } + + public PayloadBuilder(byte b) throws IOException { + this.append(b); + } + + public PayloadBuilder(byte[] bytes) throws IOException { + this.append(bytes); + } + + public PayloadBuilder(char... chars) throws IOException { + this.append(chars); + } + + public PayloadBuilder(String... strings) throws IOException { + this.append(strings); + } + + public PayloadBuilder append(byte b) throws IOException { + builderStream.write(b); + + return this; + } + + public PayloadBuilder append(byte[] bytes) throws IOException { + builderStream.write(bytes); + + return this; + } + + public PayloadBuilder append(char... chars) throws IOException { + if (chars != null) { + for (char c : chars) { + builderStream.write(c); + } + } + + return this; + } + + public PayloadBuilder append(String... strings) throws IOException { + if (strings != null) { + for (String s : strings) { + builderStream.write(s.getBytes()); + } + } + + return this; + } + + public PayloadBuilder append(byte[] payload, int startPosition, int length) throws IOException { + builderStream.write(payload, startPosition, length); + + return this; + } + + public byte[] build() { + byte[] answer = builderStream.toByteArray(); + + builderStream.reset(); + + return answer; + } + + public static byte[] build(byte b) { + try { + return new PayloadBuilder(b).build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(byte) failure", e); + } + } + + public static byte[] build(byte b, byte... bytes) { + try { + return new PayloadBuilder(b).append(bytes).build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(byte) failure", e); + } + } + + public static byte[] build(byte[] bytes) { + try { + return new PayloadBuilder(bytes).build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(byte[]) failure", e); + } + } + + public static byte[] build(char c) { + try { + return new PayloadBuilder(c).build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(char...) failure", e); + } + } + + public static byte[] build(char c, char... chars) { + try { + return new PayloadBuilder(c).append(chars).build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(char...) failure", e); + } + } + + public static byte[] build(char[] chars) { + try { + return new PayloadBuilder(chars).build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(char...) failure", e); + } + } + + public static byte[] build(String s) { + try { + return new PayloadBuilder(s).build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(String) failure", e); + } + } + + public static byte[] build(String[] strings) { + try { + return new PayloadBuilder(strings).build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(String[]) failure", e); + } + } + + public static byte[] build(char start, String s) { + try { + return new PayloadBuilder(start) + .append(s) + .build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(String) failure", e); + } + } + + public static byte[] build(char start, String s, char... end) { + try { + return new PayloadBuilder(start) + .append(s) + .append(end) + .build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(char, String, char...) failure", e); + } + } + + public static byte[] build(char start, byte[] bytes, char... end) { + try { + return new PayloadBuilder(start) + .append(bytes) + .append(end).build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(char, byte[], char...) failure", e); + } + } + + public static byte[] build(String s, char... end) { + try { + return new PayloadBuilder(s) + .append(end).build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(String, char...) failure", e); + } + } + + public static byte[] build(byte[] bytes, char... end) { + try { + return new PayloadBuilder(bytes) + .append(end).build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(byte[], char...) failure", e); + } + } + + public static byte[] build(byte[] bytes, String s) { + try { + return new PayloadBuilder(bytes) + .append(s).build(); + } catch (IOException e) { + throw new RuntimeException("PayloadBuilder.build(byte[], String) failure", e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/test/resources/log4j2.properties b/components/camel-mllp/src/test/resources/log4j2.properties index 8c321d2..e790099 100644 --- a/components/camel-mllp/src/test/resources/log4j2.properties +++ b/components/camel-mllp/src/test/resources/log4j2.properties @@ -19,14 +19,16 @@ appender.file.type = File appender.file.name = file appender.file.fileName = target/camel-mllp-test.log appender.file.layout.type = PatternLayout -appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +appender.file.layout.pattern = %d %X{camel.contextId} [%t] %-5p %-30.30c{1} - %m%n + appender.out.type = Console appender.out.name = out appender.out.layout.type = PatternLayout appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n + rootLogger.level = INFO rootLogger.appenderRef.file.ref = file -# loggers = mllp -# logger.mllp.name = org.apache.camel.component.mllp -# logger.mllp.level = DEBUG +loggers = mllp +logger.mllp.name = org.apache.camel.component.mllp +# logger.mllp.level = TRACE