# IGNITE-943 Move TcpDiscoverySpiAdapter.MessageWorkerAdapter to TcpDiscoverySpi


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f43cbbbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f43cbbbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f43cbbbb

Branch: refs/heads/ignite-943
Commit: f43cbbbb915da375fa89edf3120a0c8d52c26bfe
Parents: e5849fe
Author: sevdokimov <sevdoki...@gridgain.com>
Authored: Tue May 26 16:57:22 2015 +0300
Committer: sevdokimov <sevdoki...@gridgain.com>
Committed: Tue May 26 16:57:22 2015 +0300

----------------------------------------------------------------------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 91 ++++++++++++++++++++
 .../discovery/tcp/TcpDiscoverySpiAdapter.java   | 90 -------------------
 2 files changed, 91 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f43cbbbb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 2123afc..37a07d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.events.*;
 import org.apache.ignite.internal.processors.security.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.io.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -5321,4 +5322,94 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             U.closeQuiet(sock);
         }
     }
+
+    /**
+     * Base class for message workers.
+     */
+    protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
+        /** Pre-allocated output stream (100K). */
+        private final GridByteArrayOutputStream bout = new 
GridByteArrayOutputStream(100 * 1024);
+
+        /** Message queue. */
+        private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new 
LinkedBlockingDeque<>();
+
+        /** Backed interrupted flag. */
+        private volatile boolean interrupted;
+
+        /**
+         * @param name Thread name.
+         */
+        protected MessageWorkerAdapter(String name) {
+            super(gridName, name, log);
+
+            setPriority(threadPri);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            if (log.isDebugEnabled())
+                log.debug("Message worker started [locNodeId=" + 
getLocalNodeId() + ']');
+
+            while (!isInterrupted()) {
+                TcpDiscoveryAbstractMessage msg = queue.poll(2000, 
TimeUnit.MILLISECONDS);
+
+                if (msg == null)
+                    continue;
+
+                processMessage(msg);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void interrupt() {
+            interrupted = true;
+
+            super.interrupt();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isInterrupted() {
+            return interrupted || super.isInterrupted();
+        }
+
+        /**
+         * @return Current queue size.
+         */
+        int queueSize() {
+            return queue.size();
+        }
+
+        /**
+         * Adds message to queue.
+         *
+         * @param msg Message to add.
+         */
+        void addMessage(TcpDiscoveryAbstractMessage msg) {
+            if (msg.highPriority())
+                queue.addFirst(msg);
+            else
+                queue.add(msg);
+
+            if (log.isDebugEnabled())
+                log.debug("Message has been added to queue: " + msg);
+        }
+
+        /**
+         * @param msg Message.
+         */
+        protected abstract void processMessage(TcpDiscoveryAbstractMessage 
msg);
+
+        /**
+         * @param sock Socket.
+         * @param msg Message.
+         * @throws IOException If IO failed.
+         * @throws IgniteCheckedException If marshalling failed.
+         */
+        protected final void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg)
+            throws IOException, IgniteCheckedException {
+            bout.reset();
+
+            TcpDiscoverySpi.this.writeToSocket(sock, msg, bout);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f43cbbbb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index d5ea9f2..4349ebc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -1185,96 +1185,6 @@ public abstract class TcpDiscoverySpiAdapter extends 
IgniteSpiAdapter implements
     }
 
     /**
-     * Base class for message workers.
-     */
-    protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
-        /** Pre-allocated output stream (100K). */
-        private final GridByteArrayOutputStream bout = new 
GridByteArrayOutputStream(100 * 1024);
-
-        /** Message queue. */
-        private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new 
LinkedBlockingDeque<>();
-
-        /** Backed interrupted flag. */
-        private volatile boolean interrupted;
-
-        /**
-         * @param name Thread name.
-         */
-        protected MessageWorkerAdapter(String name) {
-            super(gridName, name, log);
-
-            setPriority(threadPri);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Message worker started [locNodeId=" + 
getLocalNodeId() + ']');
-
-            while (!isInterrupted()) {
-                TcpDiscoveryAbstractMessage msg = queue.poll(2000, 
TimeUnit.MILLISECONDS);
-
-                if (msg == null)
-                    continue;
-
-                processMessage(msg);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void interrupt() {
-            interrupted = true;
-
-            super.interrupt();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isInterrupted() {
-            return interrupted || super.isInterrupted();
-        }
-
-        /**
-         * @return Current queue size.
-         */
-        int queueSize() {
-            return queue.size();
-        }
-
-        /**
-         * Adds message to queue.
-         *
-         * @param msg Message to add.
-         */
-        void addMessage(TcpDiscoveryAbstractMessage msg) {
-            if (msg.highPriority())
-                queue.addFirst(msg);
-            else
-                queue.add(msg);
-
-            if (log.isDebugEnabled())
-                log.debug("Message has been added to queue: " + msg);
-        }
-
-        /**
-         * @param msg Message.
-         */
-        protected abstract void processMessage(TcpDiscoveryAbstractMessage 
msg);
-
-        /**
-         * @param sock Socket.
-         * @param msg Message.
-         * @throws IOException If IO failed.
-         * @throws IgniteCheckedException If marshalling failed.
-         */
-        protected final void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg)
-            throws IOException, IgniteCheckedException {
-            bout.reset();
-
-            TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout);
-        }
-    }
-
-    /**
      * Allow to connect to addresses parallel.
      */
     protected class SocketMultiConnector implements AutoCloseable {

Reply via email to