Repository: camel
Updated Branches:
  refs/heads/master bcd4577bd -> 71c6b0591


CAMEL-10512-add managedOperations for connection status check for both 
consumer&producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/71c6b059
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/71c6b059
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/71c6b059

Branch: refs/heads/master
Commit: 71c6b059158899b985094f202ac1371e3920ab1e
Parents: bcd4577
Author: onders86 <ondersez...@gmail.com>
Authored: Fri May 5 16:09:55 2017 +0300
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon May 8 14:38:09 2017 +0200

----------------------------------------------------------------------
 .../camel/component/mllp/MllpEndpoint.java      |  2 +
 .../component/mllp/MllpTcpClientProducer.java   | 34 ++++++++++++++--
 .../component/mllp/MllpTcpServerConsumer.java   | 41 ++++++++++++++++++--
 3 files changed, 70 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/71c6b059/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index 7077b2d..efc0193 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -21,6 +21,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
@@ -36,6 +37,7 @@ import org.slf4j.LoggerFactory;
  * into the log files. Logging of PHI can be globally disabled by setting the 
org.apache.camel.mllp.logPHI system
  * property to false.
  */
+@ManagedResource(description = "Mllp Endpoint")
 @UriEndpoint(firstVersion = "2.17.0", scheme = "mllp", title = "MLLP", syntax 
= "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = 
"hl7")
 public class MllpEndpoint extends DefaultEndpoint {
     public static final char START_OF_BLOCK = 0x0b;      // VT (vertical tab)  
      - decimal 11, octal 013

http://git-wip-us.apache.org/repos/asf/camel/blob/71c6b059/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
index e1827cb..8392b4b 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
@@ -23,6 +23,8 @@ import java.net.SocketException;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.component.mllp.impl.Hl7Util;
 import org.apache.camel.component.mllp.impl.MllpBufferedSocketWriter;
 import org.apache.camel.component.mllp.impl.MllpSocketReader;
@@ -45,6 +47,7 @@ import static 
org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER;
 /**
  * The MLLP producer.
  */
+@ManagedResource(description = "MllpTcpClient Producer")
 public class MllpTcpClientProducer extends DefaultProducer {
     MllpEndpoint endpoint;
 
@@ -76,6 +79,16 @@ public class MllpTcpClientProducer extends DefaultProducer {
         super.doStop();
     }
 
+    @ManagedOperation(description = "Close client socket")
+    public void closeMllpSocket() {
+        MllpSocketUtil.close(socket, log, "JMX triggered closing socket");
+    }
+
+    @ManagedOperation(description = "Reset client socket")
+    public void resetMllpSocket() {
+        MllpSocketUtil.reset(socket, log, "JMX triggered resetting socket");
+    }
+
     @Override
     public void process(Exchange exchange) throws Exception {
         log.trace("process(exchange)");
@@ -235,11 +248,13 @@ public class MllpTcpClientProducer extends 
DefaultProducer {
     }
 
     /**
-     * Validate the TCP Connection
+     * Validate the TCP Connection, if closed opens up the socket with
+     * the value set via endpoint configuration
      *
-     * @return null if the connection is valid, otherwise the Exception 
encountered checking the connection
+     * @throws IOException if the connection is not valid, otherwise the 
Exception is not
+     *                     encountered while checking the connection
      */
-    void checkConnection() throws IOException {
+    private void checkConnection() throws IOException {
         if (null == socket || socket.isClosed() || !socket.isConnected()) {
             socket = new Socket();
 
@@ -276,6 +291,19 @@ public class MllpTcpClientProducer extends DefaultProducer 
{
                 mllpSocketWriter = new MllpSocketWriter(socket, false);
             }
         }
+        return;
+    }
+
+    @ManagedOperation(description = "Check client connection")
+    public boolean managedCheckConnection() {
+        boolean isValid = true;
+        try {
+            checkConnection();
+        } catch (IOException ioEx) {
+            isValid = false;
+            log.debug("JMX check connection: {}", ioEx);
+        }
+        return isValid;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/71c6b059/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index b1be22b..d036944 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -33,6 +33,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.component.mllp.impl.Hl7Util;
 import org.apache.camel.component.mllp.impl.MllpBufferedSocketWriter;
 import org.apache.camel.component.mllp.impl.MllpSocketReader;
@@ -74,6 +76,7 @@ import static 
org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER;
 /**
  * The MLLP consumer.
  */
+@ManagedResource(description = "MllpTcpServer Consumer")
 public class MllpTcpServerConsumer extends DefaultConsumer {
     public static final int SOCKET_STARTUP_TEST_WAIT = 100;
     public static final int SOCKET_STARTUP_TEST_READ_TIMEOUT = 250;
@@ -95,6 +98,33 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         log.debug("doStart() - creating acceptor thread");
 
+        startMllpConsumer();
+
+        super.doStart();
+    }
+
+    @ManagedOperation(description = "Check server connection")
+    public boolean managedCheckConnection() {
+        boolean isValid = true;
+        try {
+            InetSocketAddress socketAddress;
+            if (null == endpoint.getHostname()) {
+                socketAddress = new InetSocketAddress(endpoint.getPort());
+            } else {
+                socketAddress = new InetSocketAddress(endpoint.getHostname(), 
endpoint.getPort());
+            }
+            Socket checkSocket = new Socket();
+            checkSocket.connect(socketAddress, 100);
+            checkSocket.close();
+        } catch (Exception e) {
+            isValid = false;
+            log.debug("JMX check connection: {}", e);
+        }
+        return isValid;
+    }
+
+    @ManagedOperation(description = "Starts serverSocket thread and waits for 
requests")
+    public void startMllpConsumer() throws IOException, InterruptedException {
         ServerSocket serverSocket = new ServerSocket();
         if (null != endpoint.receiveBufferSize) {
             serverSocket.setReceiveBufferSize(endpoint.receiveBufferSize);
@@ -129,14 +159,19 @@ public class MllpTcpServerConsumer extends 
DefaultConsumer {
 
         serverSocketThread = new ServerSocketThread(serverSocket);
         serverSocketThread.start();
-
-        super.doStart();
     }
 
     @Override
     protected void doStop() throws Exception {
         log.debug("doStop()");
 
+        stopMllpConsumer();
+
+        super.doStop();
+    }
+
+    @ManagedOperation(description = "Stops client threads and serverSocket 
thread")
+    public void stopMllpConsumer() {
         // Close any client sockets that are currently open
         for (ClientSocketThread clientSocketThread: clientThreads) {
             clientSocketThread.interrupt();
@@ -158,8 +193,6 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         }
 
         serverSocketThread = null;
-
-        super.doStop();
     }
 
     /**

Reply via email to