Repository: camel Updated Branches: refs/heads/master bcd4577bd -> 71c6b0591
CAMEL-10512-add managedOperations for connection status check for both consumer&producer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/71c6b059 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/71c6b059 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/71c6b059 Branch: refs/heads/master Commit: 71c6b059158899b985094f202ac1371e3920ab1e Parents: bcd4577 Author: onders86 <ondersez...@gmail.com> Authored: Fri May 5 16:09:55 2017 +0300 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 8 14:38:09 2017 +0200 ---------------------------------------------------------------------- .../camel/component/mllp/MllpEndpoint.java | 2 + .../component/mllp/MllpTcpClientProducer.java | 34 ++++++++++++++-- .../component/mllp/MllpTcpServerConsumer.java | 41 ++++++++++++++++++-- 3 files changed, 70 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/71c6b059/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java index 7077b2d..efc0193 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java @@ -21,6 +21,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; @@ -36,6 +37,7 @@ import org.slf4j.LoggerFactory; * into the log files. Logging of PHI can be globally disabled by setting the org.apache.camel.mllp.logPHI system * property to false. */ +@ManagedResource(description = "Mllp Endpoint") @UriEndpoint(firstVersion = "2.17.0", scheme = "mllp", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "hl7") public class MllpEndpoint extends DefaultEndpoint { public static final char START_OF_BLOCK = 0x0b; // VT (vertical tab) - decimal 11, octal 013 http://git-wip-us.apache.org/repos/asf/camel/blob/71c6b059/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java index e1827cb..8392b4b 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java @@ -23,6 +23,8 @@ import java.net.SocketException; import org.apache.camel.Exchange; import org.apache.camel.Message; +import org.apache.camel.api.management.ManagedOperation; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.component.mllp.impl.Hl7Util; import org.apache.camel.component.mllp.impl.MllpBufferedSocketWriter; import org.apache.camel.component.mllp.impl.MllpSocketReader; @@ -45,6 +47,7 @@ import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER; /** * The MLLP producer. */ +@ManagedResource(description = "MllpTcpClient Producer") public class MllpTcpClientProducer extends DefaultProducer { MllpEndpoint endpoint; @@ -76,6 +79,16 @@ public class MllpTcpClientProducer extends DefaultProducer { super.doStop(); } + @ManagedOperation(description = "Close client socket") + public void closeMllpSocket() { + MllpSocketUtil.close(socket, log, "JMX triggered closing socket"); + } + + @ManagedOperation(description = "Reset client socket") + public void resetMllpSocket() { + MllpSocketUtil.reset(socket, log, "JMX triggered resetting socket"); + } + @Override public void process(Exchange exchange) throws Exception { log.trace("process(exchange)"); @@ -235,11 +248,13 @@ public class MllpTcpClientProducer extends DefaultProducer { } /** - * Validate the TCP Connection + * Validate the TCP Connection, if closed opens up the socket with + * the value set via endpoint configuration * - * @return null if the connection is valid, otherwise the Exception encountered checking the connection + * @throws IOException if the connection is not valid, otherwise the Exception is not + * encountered while checking the connection */ - void checkConnection() throws IOException { + private void checkConnection() throws IOException { if (null == socket || socket.isClosed() || !socket.isConnected()) { socket = new Socket(); @@ -276,6 +291,19 @@ public class MllpTcpClientProducer extends DefaultProducer { mllpSocketWriter = new MllpSocketWriter(socket, false); } } + return; + } + + @ManagedOperation(description = "Check client connection") + public boolean managedCheckConnection() { + boolean isValid = true; + try { + checkConnection(); + } catch (IOException ioEx) { + isValid = false; + log.debug("JMX check connection: {}", ioEx); + } + return isValid; } } http://git-wip-us.apache.org/repos/asf/camel/blob/71c6b059/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java index b1be22b..d036944 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java @@ -33,6 +33,8 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.api.management.ManagedOperation; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.component.mllp.impl.Hl7Util; import org.apache.camel.component.mllp.impl.MllpBufferedSocketWriter; import org.apache.camel.component.mllp.impl.MllpSocketReader; @@ -74,6 +76,7 @@ import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER; /** * The MLLP consumer. */ +@ManagedResource(description = "MllpTcpServer Consumer") public class MllpTcpServerConsumer extends DefaultConsumer { public static final int SOCKET_STARTUP_TEST_WAIT = 100; public static final int SOCKET_STARTUP_TEST_READ_TIMEOUT = 250; @@ -95,6 +98,33 @@ public class MllpTcpServerConsumer extends DefaultConsumer { protected void doStart() throws Exception { log.debug("doStart() - creating acceptor thread"); + startMllpConsumer(); + + super.doStart(); + } + + @ManagedOperation(description = "Check server connection") + public boolean managedCheckConnection() { + boolean isValid = true; + try { + InetSocketAddress socketAddress; + if (null == endpoint.getHostname()) { + socketAddress = new InetSocketAddress(endpoint.getPort()); + } else { + socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort()); + } + Socket checkSocket = new Socket(); + checkSocket.connect(socketAddress, 100); + checkSocket.close(); + } catch (Exception e) { + isValid = false; + log.debug("JMX check connection: {}", e); + } + return isValid; + } + + @ManagedOperation(description = "Starts serverSocket thread and waits for requests") + public void startMllpConsumer() throws IOException, InterruptedException { ServerSocket serverSocket = new ServerSocket(); if (null != endpoint.receiveBufferSize) { serverSocket.setReceiveBufferSize(endpoint.receiveBufferSize); @@ -129,14 +159,19 @@ public class MllpTcpServerConsumer extends DefaultConsumer { serverSocketThread = new ServerSocketThread(serverSocket); serverSocketThread.start(); - - super.doStart(); } @Override protected void doStop() throws Exception { log.debug("doStop()"); + stopMllpConsumer(); + + super.doStop(); + } + + @ManagedOperation(description = "Stops client threads and serverSocket thread") + public void stopMllpConsumer() { // Close any client sockets that are currently open for (ClientSocketThread clientSocketThread: clientThreads) { clientSocketThread.interrupt(); @@ -158,8 +193,6 @@ public class MllpTcpServerConsumer extends DefaultConsumer { } serverSocketThread = null; - - super.doStop(); } /**