This is an automated email from the ASF dual-hosted git repository. quinn pushed a commit to branch camel-2.21.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5fa2f4f9ba203cca61687b34bc8b3b2d32c291d2 Author: Quinn Stevenson <qu...@apache.org> AuthorDate: Fri Mar 23 07:56:07 2018 -0600 CAMEL-12325: logging cleanup --- .../apache/camel/component/mllp/MllpEndpoint.java | 3 +- .../component/mllp/MllpTcpClientProducer.java | 123 +++++++++++---------- .../component/mllp/MllpTcpServerConsumer.java | 36 +++--- .../component/mllp/internal/MllpSocketBuffer.java | 87 ++++++++------- .../mllp/internal/MllpSocketBufferTest.java | 16 +++ .../mllp/internal/MllpSocketBufferWriteTest.java | 12 +- 6 files changed, 154 insertions(+), 123 deletions(-) diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java index dbadfa8..c05b8b6 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java @@ -53,8 +53,7 @@ import org.slf4j.LoggerFactory; * <p/> */ @ManagedResource(description = "MLLP Endpoint") -// @UriEndpoint(scheme = "mllp", firstVersion = "2.17.0", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp") -@UriEndpoint(scheme = "mllp", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp") +@UriEndpoint(scheme = "mllp", firstVersion = "2.17.0", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp") public class MllpEndpoint extends DefaultEndpoint { // Use constants from MllpProtocolConstants @Deprecated() diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java index df5705a..57bd1d3 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java @@ -18,30 +18,33 @@ package org.apache.camel.component.mllp; import java.io.IOException; + import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketTimeoutException; -import java.nio.charset.Charset; + import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Message; + import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; + import org.apache.camel.component.mllp.internal.Hl7Util; import org.apache.camel.component.mllp.internal.MllpSocketBuffer; + import org.apache.camel.impl.DefaultProducer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; /** @@ -49,12 +52,12 @@ import org.slf4j.MDC; */ @ManagedResource(description = "MLLP Producer") public class MllpTcpClientProducer extends DefaultProducer implements Runnable { - Socket socket; - + final Logger log; final MllpSocketBuffer mllpBuffer; + Socket socket; + ScheduledExecutorService idleTimeoutExecutor; - // long lastProcessCallTicks = -1; private String cachedLocalAddress; private String cachedRemoteAddress; @@ -62,6 +65,9 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { public MllpTcpClientProducer(MllpEndpoint endpoint) throws SocketException { super(endpoint); + + log = LoggerFactory.getLogger(String.format("%s.%s.%d", this.getClass().getName(), endpoint.getHostname(), endpoint.getPort())); + log.trace("Constructing MllpTcpClientProducer for endpoint URI {}", endpoint.getEndpointUri()); mllpBuffer = new MllpSocketBuffer(endpoint); @@ -127,7 +133,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { @Override public synchronized void process(Exchange exchange) throws MllpException { - log.trace("Processing Exchange {} for {}", exchange.getExchangeId(), socket); + log.trace("process({}) [{}] - entering", exchange.getExchangeId(), socket); getEndpoint().updateLastConnectionActivityTicks(); Message message = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); @@ -150,7 +156,8 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { byte[] hl7MessageBytes = null; Object messageBody = message.getBody(); if (messageBody == null) { - exchange.setException(new MllpInvalidMessageException("message body is null", hl7MessageBytes)); + String exceptionMessage = String.format("process(%s) [%s] - message body is null", exchange.getExchangeId(), socket); + exchange.setException(new MllpInvalidMessageException(exceptionMessage, hl7MessageBytes)); return; } else if (messageBody instanceof byte[]) { hl7MessageBytes = (byte[]) messageBody; @@ -162,99 +169,94 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { } } - log.debug("Sending message to external system {}", socket); + log.debug("process({}) [{}] - sending message to external system", exchange.getExchangeId(), socket); try { mllpBuffer.setEnvelopedMessage(hl7MessageBytes); mllpBuffer.writeTo(socket); } catch (MllpSocketException writeEx) { // Connection may have been reset - try one more time - log.debug("Exception encountered writing payload to {} - attempting reconnect", writeEx, socket); + log.debug("process({}) [{}] - exception encountered writing payload - attempting reconnect", exchange.getExchangeId(), socket, writeEx); try { checkConnection(); - log.trace("Reconnected succeeded - resending payload to {}", socket); + log.trace("process({}) [{}] - reconnected succeeded - resending payload", exchange.getExchangeId(), socket); try { mllpBuffer.writeTo(socket); } catch (MllpSocketException retryWriteEx) { - log.warn("Exception encountered attempting to write payload to {} after reconnect - sending original exception to exchange", socket, retryWriteEx); - exchange.setException(new MllpWriteException("Exception encountered writing payload after reconnect", mllpBuffer.toByteArrayAndReset(), retryWriteEx)); + String exceptionMessage = String.format("process(%s) [%s] - exception encountered attempting to write payload after reconnect", exchange.getExchangeId(), socket); + log.warn(exceptionMessage, retryWriteEx); + exchange.setException(new MllpWriteException(exceptionMessage, mllpBuffer.toByteArrayAndReset(), retryWriteEx)); } } catch (IOException reconnectEx) { - log.warn("Exception encountered attempting to reconnect - sending original exception to exchange", reconnectEx); - exchange.setException(new MllpWriteException("Exception encountered writing payload", mllpBuffer.toByteArrayAndReset(), writeEx)); + String exceptionMessage = String.format("process(%s) [%s] - exception encountered attempting to reconnect", exchange.getExchangeId(), socket); + log.warn(exceptionMessage, reconnectEx); + exchange.setException(new MllpWriteException(exceptionMessage, mllpBuffer.toByteArrayAndReset(), writeEx)); mllpBuffer.resetSocket(socket); } } if (exchange.getException() == null) { - log.debug("Reading acknowledgement from external system {}", socket); + log.debug("process({}) [{}] - reading acknowledgement from external system", exchange.getExchangeId(), socket); try { mllpBuffer.reset(); mllpBuffer.readFrom(socket); } catch (MllpSocketException receiveAckEx) { // Connection may have been reset - try one more time - log.debug("Exception encountered reading acknowledgement from {} - attempting reconnect", socket, receiveAckEx); + log.debug("process({}) [{}] - exception encountered reading acknowledgement - attempting reconnect", exchange.getExchangeId(), socket, receiveAckEx); try { checkConnection(); } catch (IOException reconnectEx) { - log.warn("Exception encountered attempting to reconnect after acknowledgement read failure - sending original acknowledgement exception to exchange", reconnectEx); - exchange.setException(new MllpAcknowledgementReceiveException("Exception encountered reading acknowledgement", hl7MessageBytes, receiveAckEx)); + String exceptionMessage = String.format("process(%s) [%s] - exception encountered attempting to reconnect after acknowledgement read failure", + exchange.getExchangeId(), socket); + log.warn(exceptionMessage, reconnectEx); + exchange.setException(new MllpAcknowledgementReceiveException(exceptionMessage, hl7MessageBytes, receiveAckEx)); mllpBuffer.resetSocket(socket); } if (exchange.getException() == null) { - log.trace("Reconnected succeeded - resending payload to {}", socket); + log.trace("process({}) [{}] - resending payload after successful reconnect", exchange.getExchangeId(), socket); try { mllpBuffer.setEnvelopedMessage(hl7MessageBytes); mllpBuffer.writeTo(socket); } catch (MllpSocketException writeRetryEx) { - log.warn("Exception encountered attempting to write payload to {} after read failure and successful reconnect - sending original exception to exchange", - socket, writeRetryEx); - exchange.setException(new MllpWriteException("Exception encountered writing payload after read failure and reconnect", hl7MessageBytes, receiveAckEx)); + String exceptionMessage = String.format("process(%s) [%s] - exception encountered attempting to write payload after read failure and successful reconnect", + exchange.getExchangeId(), socket); + log.warn(exceptionMessage, writeRetryEx); + exchange.setException(new MllpWriteException(exceptionMessage, hl7MessageBytes, receiveAckEx)); } if (exchange.getException() == null) { - log.trace("Resend succeeded - reading acknowledgement from {}", socket); + log.trace("process({}) [{}] - resend succeeded - reading acknowledgement from {}", exchange.getExchangeId(), socket); try { mllpBuffer.reset(); mllpBuffer.readFrom(socket); } catch (MllpSocketException secondReceiveEx) { - if (mllpBuffer.isEmpty()) { - log.warn("Exception encountered reading acknowledgement from {} after successful reconnect and resend", socket, secondReceiveEx); - Exception exchangeEx = new MllpAcknowledgementReceiveException("Exception encountered receiving Acknowledgement", hl7MessageBytes, receiveAckEx); - exchange.setException(exchangeEx); - } else { - byte[] partialAcknowledgment = mllpBuffer.toByteArray(); - mllpBuffer.reset(); - log.warn("Exception encountered reading a complete acknowledgement from {} after successful reconnect and resend", socket, secondReceiveEx); - Exception exchangeEx = new MllpAcknowledgementReceiveException("Exception encountered receiving complete Acknowledgement", - hl7MessageBytes, partialAcknowledgment, receiveAckEx); - exchange.setException(exchangeEx); - } + String exceptionMessageFormat = mllpBuffer.isEmpty() + ? "process(%s) [%s] - exception encountered reading MLLP Acknowledgement after successful reconnect and resend" + : "process(%s) [%s] - exception encountered reading complete MLLP Acknowledgement after successful reconnect and resend"; + String exceptionMessage = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket); + log.warn(exceptionMessage, secondReceiveEx); + // Send the original exception to the exchange + exchange.setException(new MllpAcknowledgementReceiveException(exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), receiveAckEx)); } catch (SocketTimeoutException secondReadTimeoutEx) { - if (mllpBuffer.isEmpty()) { - log.warn("Timeout receiving HL7 Acknowledgment from {} after successful reconnect", socket, secondReadTimeoutEx); - exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving HL7 Acknowledgement after successful reconnect", - hl7MessageBytes, secondReadTimeoutEx)); - } else { - log.warn("Timeout receiving complete HL7 Acknowledgment from {} after successful reconnect", socket, secondReadTimeoutEx); - exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving complete HL7 Acknowledgement after successful reconnect", - hl7MessageBytes, mllpBuffer.toByteArray(), secondReadTimeoutEx)); - mllpBuffer.reset(); - } + String exceptionMessageFormat = mllpBuffer.isEmpty() + ? "process(%s) [%s] - timeout receiving MLLP Acknowledgment after successful reconnect and resend" + : "process(%s) [%s] - timeout receiving complete MLLP Acknowledgment after successful reconnect and resend"; + String exceptionMessage = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket); + log.warn(exceptionMessage, secondReadTimeoutEx); + // Send the original exception to the exchange + exchange.setException(new MllpAcknowledgementTimeoutException(exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), receiveAckEx)); mllpBuffer.resetSocket(socket); } } } } catch (SocketTimeoutException timeoutEx) { - if (mllpBuffer.isEmpty()) { - log.warn("Timeout receiving HL7 Acknowledgment from {}", socket, timeoutEx); - exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving HL7 Acknowledgement", hl7MessageBytes, timeoutEx)); - } else { - log.warn("Timeout receiving complete HL7 Acknowledgment from {}", socket, timeoutEx); - exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving complete HL7 Acknowledgement", hl7MessageBytes, mllpBuffer.toByteArray(), timeoutEx)); - mllpBuffer.reset(); - } + String exceptionMessageFormat = mllpBuffer.isEmpty() + ? "process(%s) [%s] - timeout receiving MLLP Acknowledgment" + : "process(%s) [%s] - timeout receiving complete MLLP Acknowledgment"; + String exceptionMessage = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket); + log.warn(exceptionMessage, timeoutEx); + exchange.setException(new MllpAcknowledgementTimeoutException(exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), timeoutEx)); mllpBuffer.resetSocket(socket); } @@ -262,7 +264,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { if (mllpBuffer.hasCompleteEnvelope()) { byte[] acknowledgementBytes = mllpBuffer.toMllpPayload(); - log.debug("Populating message headers with the acknowledgement from the external system {}", socket); + log.debug("process({}) [{}] - populating message headers with the acknowledgement from the external system", exchange.getExchangeId(), socket); message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, acknowledgementBytes); if (acknowledgementBytes != null && acknowledgementBytes.length > 0) { message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, new String(acknowledgementBytes, getConfiguration().getCharset(exchange, acknowledgementBytes))); @@ -278,7 +280,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { } if (exchange.getException() == null) { - log.debug("Processing the acknowledgement from the external system {}", socket); + log.debug("process({}) [{}] - processing the acknowledgement from the external system", exchange.getExchangeId(), socket); try { message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, processAcknowledgment(hl7MessageBytes, acknowledgementBytes)); } catch (MllpNegativeAcknowledgementException nackEx) { @@ -289,18 +291,21 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { getEndpoint().checkAfterSendProperties(exchange, socket, log); } } else { - exchange.setException(new MllpInvalidAcknowledgementException("Invalid acknowledgement received", hl7MessageBytes, mllpBuffer.toByteArrayAndReset())); + String exceptionMessage = String.format("process(%s) [%s] - invalid acknowledgement received", exchange.getExchangeId(), socket); + exchange.setException(new MllpInvalidAcknowledgementException(exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset())); } } } } catch (IOException ioEx) { - log.debug("Exception encountered checking connection {}", socket, ioEx); + log.debug("process({}) [{}] - IOException encountered checking connection", exchange.getExchangeId(), socket, ioEx); exchange.setException(ioEx); mllpBuffer.resetSocket(socket); } finally { mllpBuffer.reset(); } + + log.trace("process({}) [{}] - exiting", exchange.getExchangeId(), socket); } private String processAcknowledgment(byte[] hl7MessageBytes, byte[] hl7AcknowledgementBytes) throws MllpException { @@ -328,7 +333,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable { // Found the beginning of the MSA - the next two bytes should be our acknowledgement code msaStartIndex = i + 1; if (bA != hl7AcknowledgementBytes[i + 5] && bC != hl7AcknowledgementBytes[i + 5]) { - String errorMessage = "Unsupported acknowledgement type: " + new String(hl7AcknowledgementBytes, i + 5, 2); + String errorMessage = String.format("processAcknowledgment(hl7MessageBytes[%d], hl7AcknowledgementBytes[%d]) - unsupported acknowledgement type: '%s'", hl7MessageBytes == null ? -1 : hl7MessageBytes.length, hl7AcknowledgementBytes.length, new String(hl7AcknowledgementBytes, i + 5, 2)); throw new MllpInvalidAcknowledgementException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes); } else { switch (hl7AcknowledgementBytes[i + 6]) { diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java index 4344dfb..06136f7 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java @@ -52,12 +52,16 @@ import org.apache.camel.converter.IOConverter; import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerationException; import org.apache.camel.util.IOHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * The MLLP consumer. */ @ManagedResource(description = "MLLP Producer") public class MllpTcpServerConsumer extends DefaultConsumer { + final Logger log; final ExecutorService validationExecutor; final ExecutorService consumerExecutor; @@ -69,7 +73,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer { public MllpTcpServerConsumer(MllpEndpoint endpoint, Processor processor) { super(endpoint, processor); - log.trace("MllpTcpServerConsumer(endpoint, processor)"); + log = LoggerFactory.getLogger(String.format("%s.%d", this.getClass().getName(), endpoint.getPort())); validationExecutor = Executors.newCachedThreadPool(); consumerExecutor = new ThreadPoolExecutor(1, getConfiguration().getMaxConcurrentConsumers(), getConfiguration().getAcceptTimeout(), TimeUnit.MILLISECONDS, new SynchronousQueue<>()); @@ -117,7 +121,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer { @Override protected void doStop() throws Exception { - log.debug("doStop()"); + log.trace("doStop()"); // Close any client sockets that are currently open for (TcpSocketConsumerRunnable consumerClientSocketThread : consumerRunnables.keySet()) { @@ -193,10 +197,10 @@ public class MllpTcpServerConsumer extends DefaultConsumer { TcpServerConsumerValidationRunnable client = new TcpServerConsumerValidationRunnable(this, clientSocket, mllpBuffer); try { - log.debug("Validating consumer for Socket {}", clientSocket); + log.debug("validateConsumer({}) - submitting client for validation", clientSocket); validationExecutor.submit(client); } catch (RejectedExecutionException rejectedExecutionEx) { - log.warn("Cannot validate consumer - max validations already active"); + log.warn("validateConsumer({}) - cannot validate client - max validations already active", clientSocket); mllpBuffer.resetSocket(clientSocket); } } @@ -211,11 +215,11 @@ public class MllpTcpServerConsumer extends DefaultConsumer { consumerRunnables.put(client, System.currentTimeMillis()); try { - log.info("Starting consumer for Socket {}", clientSocket); + log.info("startConsumer({}) - starting consumer", clientSocket); consumerExecutor.submit(client); getEndpoint().updateLastConnectionEstablishedTicks(); } catch (RejectedExecutionException rejectedExecutionEx) { - log.warn("Cannot start consumer - max consumers already active"); + log.warn("startConsumer({}) - cannot start consumer - max consumers already active", clientSocket); mllpBuffer.resetSocket(clientSocket); } } @@ -227,7 +231,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer { consumerRunnables.put(consumerRunnable, now); // Send the message on to Camel for processing and wait for the response - log.debug("Populating the exchange with received message"); + log.debug("processMessage(hl7MessageBytes[{}], {}) - populating the exchange with received payload", hl7MessageBytes == null ? -1 : hl7MessageBytes.length, consumerRunnable.getSocket()); Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut); if (getConfiguration().hasCharsetName()) { exchange.setProperty(Exchange.CHARSET_NAME, getConfiguration().getCharsetName()); @@ -267,17 +271,17 @@ public class MllpTcpServerConsumer extends DefaultConsumer { message.setBody(hl7MessageBytes, byte[].class); } - log.debug("Calling processor"); + log.debug("processMessage(hl7MessageBytes[{}], {}) - calling processor", hl7MessageBytes == null ? -1 : hl7MessageBytes.length, consumerRunnable.getSocket()); try { getProcessor().process(exchange); sendAcknowledgement(hl7MessageBytes, exchange, consumerRunnable); } catch (Exception unexpectedEx) { - String resetMessage = "Unexpected exception processing exchange"; + String resetMessage = "processMessage(byte[], TcpSocketConsumerRunnable) - Unexpected exception processing exchange"; consumerRunnable.resetSocket(resetMessage); getExceptionHandler().handleException(resetMessage, exchange, unexpectedEx); } } catch (Exception uowEx) { - getExceptionHandler().handleException("Unexpected exception creating Unit of Work", exchange, uowEx); + getExceptionHandler().handleException("processMessage(byte[], TcpSocketConsumerRunnable) - Unexpected exception creating Unit of Work", exchange, uowEx); } finally { if (exchange != null) { doneUoW(exchange); @@ -389,7 +393,8 @@ public class MllpTcpServerConsumer extends DefaultConsumer { void sendAcknowledgement(byte[] originalHl7MessageBytes, Exchange exchange, TcpSocketConsumerRunnable consumerRunnable) { - log.trace("entering sendAcknowledgement(byte[], Exchange)"); + log.trace("sendAcknowledgement(originalHl7MessageBytes[{}], Exchange[{}], {}) - entering", + originalHl7MessageBytes == null ? -1 : originalHl7MessageBytes.length, exchange.getExchangeId(), consumerRunnable.getSocket()); getEndpoint().checkBeforeSendProperties(exchange, consumerRunnable.getSocket(), log); @@ -519,6 +524,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer { Charset charset = getConfiguration().getCharset(exchange); + // TODO: re-evaluate this - it seems that the MLLP buffer should be populated by now if (consumerRunnable.getMllpBuffer().hasCompleteEnvelope()) { // The mllpBuffer will be used if bufferWrites is set or if auto acknowledgement is used message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, consumerRunnable.getMllpBuffer().toMllpPayload()); @@ -526,7 +532,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer { // Send the acknowledgement if (log.isDebugEnabled()) { - log.debug("Sending Acknowledgement: {}", consumerRunnable.getMllpBuffer().toPrintFriendlyHl7String()); + log.debug("sendAcknowledgement(originalHl7MessageBytes[{}], Exchange[{}], {}) - Sending Acknowledgement: {}", + originalHl7MessageBytes == null ? -1 : originalHl7MessageBytes.length, exchange.getExchangeId(), consumerRunnable.getSocket(), + consumerRunnable.getMllpBuffer().toPrintFriendlyHl7String()); } try { @@ -545,7 +553,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer { // Send the acknowledgement if (log.isDebugEnabled()) { - log.debug("Sending Acknowledgement: {}", Hl7Util.convertToPrintFriendlyString(acknowledgementMessageBytes)); + log.debug("sendAcknowledgement(originalHl7MessageBytes[{}], Exchange[{}], {}) - Sending Acknowledgement: {}", + originalHl7MessageBytes == null ? -1 : originalHl7MessageBytes.length, exchange.getExchangeId(), consumerRunnable.getSocket(), + Hl7Util.convertToPrintFriendlyString(acknowledgementMessageBytes)); } try { diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java index 1d7daf6..3071351 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java @@ -90,29 +90,29 @@ public class MllpSocketBuffer { if (sourceBytes != null && sourceBytes.length > 0) { if (offset < 0) { throw new IndexOutOfBoundsException( - String.format("offset <%d> is less than zero", - offset)); + String.format("write(byte[%d], offset[%d], writeCount[%d]) - offset is less than zero", + sourceBytes.length, offset, writeCount)); } if (offset > sourceBytes.length) { throw new IndexOutOfBoundsException( - String.format("offset <%d> is greater than write count <%d>", - offset, writeCount)); + String.format("write(byte[%d], offset[%d], writeCount[%d]) - offset is greater than write count", + sourceBytes.length, offset, writeCount)); } if (writeCount < 0) { throw new IndexOutOfBoundsException( - String.format("write count <%d> is less than zero", - writeCount)); + String.format("write(byte[%d], offset[%d], writeCount[%d]) - write count is less than zero", + sourceBytes.length, offset, writeCount)); } if (writeCount > sourceBytes.length) { throw new IndexOutOfBoundsException( - String.format("write count <%d> is greater than length of the source byte[] <%d>", - writeCount, sourceBytes.length)); + String.format("write(byte[%d], offset[%d], writeCount[%d]) - write count is greater than length of the source byte[]", + sourceBytes.length, offset, writeCount)); } if ((offset + writeCount) - sourceBytes.length > 0) { throw new IndexOutOfBoundsException( - String.format("offset <%d> plus write count <%d> is <%d> is greater than length <%d> of the source byte[]", - offset, writeCount, offset + writeCount, sourceBytes.length)); + String.format("write(byte[%d], offset[%d], writeCount[%d]) - offset plus write count <%d> is greater than length of the source byte[]", + sourceBytes.length, offset, writeCount, offset + writeCount)); } ensureCapacity(writeCount); @@ -144,17 +144,17 @@ public class MllpSocketBuffer { if (hl7Payload != null && hl7Payload.length > 0) { if (hl7Payload[0] != MllpProtocolConstants.START_OF_BLOCK) { - write(MllpProtocolConstants.START_OF_BLOCK); + openMllpEnvelope(); } write(hl7Payload, offset, length); if (!hasCompleteEnvelope()) { - write(MllpProtocolConstants.PAYLOAD_TERMINATOR); + closeMllpEnvelope(); } } else { - write(MllpProtocolConstants.START_OF_BLOCK); - write(MllpProtocolConstants.PAYLOAD_TERMINATOR); + openMllpEnvelope(); + closeMllpEnvelope(); } } @@ -176,7 +176,7 @@ public class MllpSocketBuffer { public synchronized void readFrom(Socket socket, int receiveTimeout, int readTimeout) throws MllpSocketException, SocketTimeoutException { if (socket != null && socket.isConnected() && !socket.isClosed()) { - log.trace("Entering readFrom for {} ...", socket); + log.trace("readFrom({}, {}, {}) - entering", socket, receiveTimeout, readTimeout); ensureCapacity(MIN_BUFFER_SIZE); try { @@ -197,27 +197,27 @@ public class MllpSocketBuffer { } catch (SocketTimeoutException timeoutEx) { throw timeoutEx; } catch (IOException ioEx) { - final String exceptionMessage = "Exception encountered reading Socket"; + final String exceptionMessage = String.format("readFrom(%s, %d, %d) - IOException encountered", socket, receiveTimeout, readTimeout); resetSocket(socket, exceptionMessage); throw new MllpSocketException(exceptionMessage, ioEx); } finally { if (size() > 0 && !hasCompleteEnvelope()) { if (!hasEndOfData() && hasEndOfBlock() && endOfBlockIndex < size() - 1) { - log.warn("readFrom {} exiting with partial payload {}", socket, Hl7Util.convertToPrintFriendlyString(buffer, 0, size() - 1)); + log.warn("readFrom({}, {}, {}) - exiting with partial payload {}", socket, receiveTimeout, readTimeout, Hl7Util.convertToPrintFriendlyString(buffer, 0, size() - 1)); } } } } else { - log.warn("Socket is invalid - no data read"); + log.warn("readFrom({}, {}, {}) - no data read because Socket is invalid", socket, receiveTimeout, readTimeout); } - log.trace("Exiting readFrom ..."); + log.trace("readFrom({}, {}, {}) - exiting", socket, receiveTimeout, readTimeout); } public synchronized void writeTo(Socket socket) throws MllpSocketException { if (socket != null && socket.isConnected() && !socket.isClosed()) { - log.trace("Entering writeTo for {} ...", socket); + log.trace("writeTo({}) - entering", socket); if (!isEmpty()) { try { OutputStream socketOutputStream = socket.getOutputStream(); @@ -238,18 +238,18 @@ public class MllpSocketBuffer { } socketOutputStream.flush(); } catch (IOException ioEx) { - final String exceptionMessage = "Exception encountered writing Socket"; + final String exceptionMessage = String.format("writeTo({}) - IOException encountered", socket); resetSocket(socket, exceptionMessage); throw new MllpSocketException(exceptionMessage, ioEx); } } else { - log.warn("Ignoring call to writeTo(byte[] payload) for {} - MLLP payload is null or empty", socket); + log.warn("writeTo({}) - no data written because buffer is empty", socket); } } else { - log.warn("Socket is invalid - no data written"); + log.warn("writeTo({}) - no data written because Socket is invalid", socket); } - log.trace("Exiting writeTo ..."); + log.trace("writeTo({}) - exiting", socket); } public synchronized byte[] toByteArray() { @@ -287,9 +287,9 @@ public class MllpSocketBuffer { if (Charset.isSupported(charsetName)) { return toString(Charset.forName(charsetName)); } - log.warn("Unsupported character set name {} - using the MLLP default character set {}", charsetName, MllpComponent.getDefaultCharset()); + log.warn("toString(charsetName[{}]) - unsupported character set name - using the MLLP default character set {}", charsetName, MllpComponent.getDefaultCharset()); } catch (Exception charsetEx) { - log.warn("Ignoring exception encountered determining character set for name {} - using the MLLP default character set {}", + log.warn("toString(charsetName[{}]) - ignoring exception encountered determining character set - using the MLLP default character set {}", charsetName, MllpComponent.getDefaultCharset(), charsetEx); } @@ -355,9 +355,9 @@ public class MllpSocketBuffer { if (Charset.isSupported(charsetName)) { return toHl7String(Charset.forName(charsetName)); } - log.warn("Unsupported character set name {} - using the MLLP default character set {}", charsetName, MllpComponent.getDefaultCharset()); + log.warn("toHl7String(charsetName[{}]) - unsupported character set name - using the MLLP default character set {}", charsetName, MllpComponent.getDefaultCharset()); } catch (Exception charsetEx) { - log.warn("Ignoring exception encountered determining character set for name {} - using the MLLP default character set {}", + log.warn("toHl7String(charsetName[{}]) - ignoring exception encountered determining character set for name - using the MLLP default character set {}", charsetName, MllpComponent.getDefaultCharset(), charsetEx); } } @@ -612,12 +612,14 @@ public class MllpSocketBuffer { } void readSocketInputStream(InputStream socketInputStream, Socket socket) throws MllpSocketException, SocketTimeoutException { - log.trace("Entering readSocketInputStream - size = {}", size()); + log.trace("readSocketInputStream(socketInputStream, {}) - entering with initial buffer size = {}", socket, size()); try { int readCount = socketInputStream.read(buffer, availableByteCount, buffer.length - availableByteCount); if (readCount == MllpProtocolConstants.END_OF_STREAM) { + final String exceptionMessage = String.format("readSocketInputStream(socketInputStream, %s) - END_OF_STREAM returned from SocketInputStream.read(byte[%d], %d, %d)", + socket, buffer.length, availableByteCount, buffer.length - availableByteCount); resetSocket(socket); - throw new MllpSocketException("END_OF_STREAM returned from SocketInputStream.read(byte[], off, len)"); + throw new MllpSocketException(exceptionMessage); } if (readCount > 0) { for (int i = 0; (startOfBlockIndex == -1 || endOfBlockIndex == -1) && i < readCount; ++i) { @@ -626,22 +628,20 @@ public class MllpSocketBuffer { availableByteCount += readCount; if (hasStartOfBlock()) { - log.trace("Read {} bytes for a total of {} bytes", readCount, availableByteCount); + log.trace("readSocketInputStream(socketInputStream, {}) - read {} bytes for a total of {} bytes", socket, readCount, availableByteCount); } else { - log.warn("Ignoring {} bytes received before START_OF_BLOCK", size(), toPrintFriendlyStringAndReset()); + log.warn("readSocketInputStream(socketInputStream, {}) - ignoring {} bytes received before START_OF_BLOCK", socket, size(), toPrintFriendlyStringAndReset()); } } } catch (SocketTimeoutException timeoutEx) { throw timeoutEx; - } catch (SocketException socketEx) { - resetSocket(socket); - throw new MllpSocketException("SocketException encountered in readSocketInputStream", socketEx); } catch (IOException ioEx) { - final String exceptionMessage = "IOException thrown from SocketInputStream.read(byte[], off, len)"; + final String exceptionMessage = String.format("readSocketInputStream(socketInputStream, %s) - IOException thrown from SocketInputStream.read(byte[%d], %d, %d) from %s", + socket, buffer.length, availableByteCount, buffer.length - availableByteCount, socket); resetSocket(socket); - throw new MllpSocketException("IOException thrown from SocketInputStream.read(byte[], off, len)", ioEx); + throw new MllpSocketException(exceptionMessage, ioEx); } finally { - log.trace("Exiting readSocketInputStream - size = {}", size()); + log.trace("readSocketInputStream(socketInputStream, {}) - exiting with buffer size = {}", socket, size()); } } @@ -675,7 +675,7 @@ public class MllpSocketBuffer { try { socket.shutdownInput(); } catch (IOException ignoredEx) { - // TODO: Maybe log this + log.trace("doSocketClose(socket[{}], logMessage[{}], reset[{}] - ignoring exception raised by Socket.shutdownInput()", socket, logMessage, reset, ignoredEx); } } @@ -683,17 +683,17 @@ public class MllpSocketBuffer { try { socket.shutdownOutput(); } catch (IOException ignoredEx) { - // TODO: Maybe log this + log.trace("doSocketClose(socket[{}], logMessage[{}], reset[{}] - ignoring exception raised by Socket.shutdownOutput()", socket, logMessage, reset, ignoredEx); } } if (reset) { + final boolean on = true; + final int linger = 0; try { - final boolean on = true; - final int linger = 0; socket.setSoLinger(on, linger); } catch (IOException ignoredEx) { - // TODO: Maybe log this + log.trace("doSocketClose(socket[{}], logMessage[{}], reset[{}] - ignoring exception raised by Socket.setSoLinger({}, {})", socket, logMessage, reset, on, linger, ignoredEx); } } @@ -701,6 +701,7 @@ public class MllpSocketBuffer { socket.close(); } catch (IOException ignoredEx) { // TODO: Maybe log this + log.trace("doSocketClose(socket[{}], logMessage[{}], reset[{}] - ignoring exception raised by Socket.close()", socket, logMessage, reset, ignoredEx); } } } diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java index cd87fc6..ee473c4 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** @@ -38,6 +39,21 @@ public class MllpSocketBufferTest extends SocketBufferTestSupport { * @throws Exception in the event of a test error. */ @Test + public void testConstructorWithNullEndpoing() throws Exception { + try { + new MllpSocketBuffer(null); + fail("Constructor should have thrown an exception with a null Endpoint argument"); + } catch (IllegalArgumentException expectedEx) { + assertEquals("MllpEndpoint cannot be null", expectedEx.getMessage()); + } + } + + /** + * Description of test. + * + * @throws Exception in the event of a test error. + */ + @Test public void testToHl7StringWithRequiredEndOfData() throws Exception { assertNull(instance.toHl7String()); diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java index 903669e..ae097ac 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java @@ -228,7 +228,7 @@ public class MllpSocketBufferWriteTest extends SocketBufferTestSupport { instance.write(payload, -5, payload.length); fail("Exception should have been thrown"); } catch (IndexOutOfBoundsException expectedEx) { - assertEquals("offset <-5> is less than zero", expectedEx.getMessage()); + assertEquals("write(byte[4], offset[-5], writeCount[4]) - offset is less than zero", expectedEx.getMessage()); } } @@ -245,7 +245,7 @@ public class MllpSocketBufferWriteTest extends SocketBufferTestSupport { instance.write(payload, payload.length + 1, payload.length); fail("Exception should have been thrown"); } catch (IndexOutOfBoundsException expectedEx) { - assertEquals("offset <5> is greater than write count <4>", expectedEx.getMessage()); + assertEquals("write(byte[4], offset[5], writeCount[4]) - offset is greater than write count", expectedEx.getMessage()); } } @@ -261,7 +261,7 @@ public class MllpSocketBufferWriteTest extends SocketBufferTestSupport { instance.write("BLAH".getBytes(), 0, -5); fail("Exception should have been thrown"); } catch (IndexOutOfBoundsException expectedEx) { - assertEquals("write count <-5> is less than zero", expectedEx.getMessage()); + assertEquals("write(byte[4], offset[0], writeCount[-5]) - write count is less than zero", expectedEx.getMessage()); } } @@ -278,21 +278,21 @@ public class MllpSocketBufferWriteTest extends SocketBufferTestSupport { instance.write(payload, 0, payload.length + 1); fail("Exception should have been thrown"); } catch (IndexOutOfBoundsException expectedEx) { - assertEquals("write count <5> is greater than length of the source byte[] <4>", expectedEx.getMessage()); + assertEquals("write(byte[4], offset[0], writeCount[5]) - write count is greater than length of the source byte[]", expectedEx.getMessage()); } try { instance.write("BLAH".getBytes(), 1, payload.length); fail("Exception should have been thrown"); } catch (IndexOutOfBoundsException expectedEx) { - assertEquals("offset <1> plus write count <4> is <5> is greater than length <4> of the source byte[]", expectedEx.getMessage()); + assertEquals("write(byte[4], offset[1], writeCount[4]) - offset plus write count <5> is greater than length of the source byte[]", expectedEx.getMessage()); } try { instance.write("BLAH".getBytes(), 2, payload.length - 1); fail("Exception should have been thrown"); } catch (IndexOutOfBoundsException expectedEx) { - assertEquals("offset <2> plus write count <3> is <5> is greater than length <4> of the source byte[]", expectedEx.getMessage()); + assertEquals("write(byte[4], offset[2], writeCount[3]) - offset plus write count <5> is greater than length of the source byte[]", expectedEx.getMessage()); } } -- To stop receiving notification emails like this one, please contact qu...@apache.org.