# IGNITE-943 Move TcpDiscoverySpiAdapter.MessageWorkerAdapter to TcpDiscoverySpi
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f43cbbbb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f43cbbbb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f43cbbbb Branch: refs/heads/ignite-943 Commit: f43cbbbb915da375fa89edf3120a0c8d52c26bfe Parents: e5849fe Author: sevdokimov <sevdoki...@gridgain.com> Authored: Tue May 26 16:57:22 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Tue May 26 16:57:22 2015 +0300 ---------------------------------------------------------------------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 91 ++++++++++++++++++++ .../discovery/tcp/TcpDiscoverySpiAdapter.java | 90 ------------------- 2 files changed, 91 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f43cbbbb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 2123afc..37a07d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.events.*; import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.io.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; @@ -5321,4 +5322,94 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov U.closeQuiet(sock); } } + + /** + * Base class for message workers. + */ + protected abstract class MessageWorkerAdapter extends IgniteSpiThread { + /** Pre-allocated output stream (100K). */ + private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024); + + /** Message queue. */ + private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>(); + + /** Backed interrupted flag. */ + private volatile boolean interrupted; + + /** + * @param name Thread name. + */ + protected MessageWorkerAdapter(String name) { + super(gridName, name, log); + + setPriority(threadPri); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("Message worker started [locNodeId=" + getLocalNodeId() + ']'); + + while (!isInterrupted()) { + TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS); + + if (msg == null) + continue; + + processMessage(msg); + } + } + + /** {@inheritDoc} */ + @Override public void interrupt() { + interrupted = true; + + super.interrupt(); + } + + /** {@inheritDoc} */ + @Override public boolean isInterrupted() { + return interrupted || super.isInterrupted(); + } + + /** + * @return Current queue size. + */ + int queueSize() { + return queue.size(); + } + + /** + * Adds message to queue. + * + * @param msg Message to add. + */ + void addMessage(TcpDiscoveryAbstractMessage msg) { + if (msg.highPriority()) + queue.addFirst(msg); + else + queue.add(msg); + + if (log.isDebugEnabled()) + log.debug("Message has been added to queue: " + msg); + } + + /** + * @param msg Message. + */ + protected abstract void processMessage(TcpDiscoveryAbstractMessage msg); + + /** + * @param sock Socket. + * @param msg Message. + * @throws IOException If IO failed. + * @throws IgniteCheckedException If marshalling failed. + */ + protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) + throws IOException, IgniteCheckedException { + bout.reset(); + + TcpDiscoverySpi.this.writeToSocket(sock, msg, bout); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f43cbbbb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java index d5ea9f2..4349ebc 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java @@ -1185,96 +1185,6 @@ public abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements } /** - * Base class for message workers. - */ - protected abstract class MessageWorkerAdapter extends IgniteSpiThread { - /** Pre-allocated output stream (100K). */ - private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024); - - /** Message queue. */ - private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>(); - - /** Backed interrupted flag. */ - private volatile boolean interrupted; - - /** - * @param name Thread name. - */ - protected MessageWorkerAdapter(String name) { - super(gridName, name, log); - - setPriority(threadPri); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Message worker started [locNodeId=" + getLocalNodeId() + ']'); - - while (!isInterrupted()) { - TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS); - - if (msg == null) - continue; - - processMessage(msg); - } - } - - /** {@inheritDoc} */ - @Override public void interrupt() { - interrupted = true; - - super.interrupt(); - } - - /** {@inheritDoc} */ - @Override public boolean isInterrupted() { - return interrupted || super.isInterrupted(); - } - - /** - * @return Current queue size. - */ - int queueSize() { - return queue.size(); - } - - /** - * Adds message to queue. - * - * @param msg Message to add. - */ - void addMessage(TcpDiscoveryAbstractMessage msg) { - if (msg.highPriority()) - queue.addFirst(msg); - else - queue.add(msg); - - if (log.isDebugEnabled()) - log.debug("Message has been added to queue: " + msg); - } - - /** - * @param msg Message. - */ - protected abstract void processMessage(TcpDiscoveryAbstractMessage msg); - - /** - * @param sock Socket. - * @param msg Message. - * @throws IOException If IO failed. - * @throws IgniteCheckedException If marshalling failed. - */ - protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) - throws IOException, IgniteCheckedException { - bout.reset(); - - TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout); - } - } - - /** * Allow to connect to addresses parallel. */ protected class SocketMultiConnector implements AutoCloseable {