# IGNITE-943 Rename TcpDiscoveryImpl.adapter to spi
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0e192ef8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0e192ef8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0e192ef8 Branch: refs/heads/ignite-218 Commit: 0e192ef84def1cdfe121e1e132c0b9740341fb9c Parents: 92b2a57 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Thu May 28 12:11:22 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Thu May 28 14:46:07 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 138 ++++---- .../ignite/spi/discovery/tcp/ServerImpl.java | 348 +++++++++---------- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 14 +- 3 files changed, 250 insertions(+), 250 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0e192ef8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index aa254ec..2171085 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -116,7 +116,7 @@ class ClientImpl extends TcpDiscoveryImpl { b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); b.append(" Socket reader: ").append(threadStatus(sockReader)).append(U.nl()); b.append(" Socket writer: ").append(threadStatus(sockWriter)).append(U.nl()); - b.append(" Socket timeout worker: ").append(threadStatus(adapter.sockTimeoutWorker)).append(U.nl()); + b.append(" Socket timeout worker: ").append(threadStatus(spi.sockTimeoutWorker)).append(U.nl()); b.append(U.nl()); @@ -127,7 +127,7 @@ class ClientImpl extends TcpDiscoveryImpl { b.append(U.nl()); - b.append("Stats: ").append(adapter.stats).append(U.nl()); + b.append("Stats: ").append(spi.stats).append(U.nl()); U.quietAndInfo(log, b.toString()); } @@ -153,9 +153,9 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { - adapter.initLocalNode(0, true); + spi.initLocalNode(0, true); - locNode = adapter.locNode; + locNode = spi.locNode; sockWriter = new SocketWriter(); sockWriter.start(); @@ -176,9 +176,9 @@ class ClientImpl extends TcpDiscoveryImpl { throw new IgniteSpiException("Thread has been interrupted.", e); } - timer.schedule(new HeartbeatSender(), adapter.hbFreq, adapter.hbFreq); + timer.schedule(new HeartbeatSender(), spi.hbFreq, spi.hbFreq); - adapter.printStartInfo(); + spi.printStartInfo(); } /** {@inheritDoc} */ @@ -189,7 +189,7 @@ class ClientImpl extends TcpDiscoveryImpl { msgWorker.addMessage(SPI_STOP); try { - if (!leaveLatch.await(adapter.netTimeout, MILLISECONDS)) + if (!leaveLatch.await(spi.netTimeout, MILLISECONDS)) U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']'); } catch (InterruptedException ignored) { @@ -210,7 +210,7 @@ class ClientImpl extends TcpDiscoveryImpl { U.join(sockWriter, log); U.join(sockReader, log); - adapter.printStopInfo(); + spi.printStopInfo(); } /** {@inheritDoc} */ @@ -253,7 +253,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (oldFut != null) fut = oldFut; else { - if (adapter.getSpiContext().isStopping()) { + if (spi.getSpiContext().isStopping()) { if (pingFuts.remove(nodeId, fut)) fut.onDone(false); @@ -267,7 +267,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (pingFuts.remove(nodeId, finalFut)) finalFut.onDone(false); } - }, adapter.netTimeout); + }, spi.netTimeout); sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId)); } @@ -297,13 +297,13 @@ class ClientImpl extends TcpDiscoveryImpl { leaveLatch.countDown(); joinLatch.countDown(); - adapter.getSpiContext().deregisterPorts(); + spi.getSpiContext().deregisterPorts(); Collection<ClusterNode> rmts = getRemoteNodes(); // This is restart/disconnection and remote nodes are not empty. // We need to fire FAIL event for each. - DiscoverySpiListener lsnr = adapter.lsnr; + DiscoverySpiListener lsnr = spi.lsnr; if (lsnr != null) { for (ClusterNode n : rmts) { @@ -330,7 +330,7 @@ class ClientImpl extends TcpDiscoveryImpl { try { sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, - adapter.marsh.marshal(evt))); + spi.marsh.marshal(evt))); } catch (IgniteCheckedException e) { throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); @@ -364,16 +364,16 @@ class ClientImpl extends TcpDiscoveryImpl { throw new InterruptedException(); while (addrs == null || addrs.isEmpty()) { - addrs = adapter.resolvedAddresses(); + addrs = spi.resolvedAddresses(); if (!F.isEmpty(addrs)) { if (log.isDebugEnabled()) log.debug("Resolved addresses from IP finder: " + addrs); } else { - U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + adapter.ipFinder); + U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + spi.ipFinder); - if (adapter.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > adapter.joinTimeout) + if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout) return null; Thread.sleep(2000); @@ -401,20 +401,20 @@ class ClientImpl extends TcpDiscoveryImpl { UUID rmtNodeId = t.get2(); - adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - ts); + spi.stats.onClientSocketInitialized(U.currentTimeMillis() - ts); locNode.clientRouterNodeId(rmtNodeId); TcpDiscoveryAbstractMessage msg = recon ? new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId) : - new TcpDiscoveryJoinRequestMessage(locNode, adapter.collectExchangeData(getLocalNodeId())); + new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId())); msg.client(true); - adapter.writeToSocket(sock, msg); + spi.writeToSocket(sock, msg); - int res = adapter.readReceipt(sock, adapter.ackTimeout); + int res = spi.readReceipt(sock, spi.ackTimeout); switch (res) { case RES_OK: @@ -447,7 +447,7 @@ class ClientImpl extends TcpDiscoveryImpl { U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " + "in 2000ms): " + addrs0); - if (adapter.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > adapter.joinTimeout) + if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout) return null; Thread.sleep(2000); @@ -470,11 +470,11 @@ class ClientImpl extends TcpDiscoveryImpl { topHist.put(topVer, allNodes); - if (topHist.size() > adapter.topHistSize) + if (topHist.size() > spi.topHistSize) topHist.pollFirstEntry(); assert topHist.lastKey() == topVer; - assert topHist.size() <= adapter.topHistSize; + assert topHist.size() <= spi.topHistSize; } return allNodes; @@ -505,15 +505,15 @@ class ClientImpl extends TcpDiscoveryImpl { private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, IgniteCheckedException { assert addr != null; - Socket sock = adapter.openSocket(addr); + Socket sock = spi.openSocket(addr); TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId()); req.client(true); - adapter.writeToSocket(sock, req); + spi.writeToSocket(sock, req); - TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, adapter.ackTimeout); + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, spi.ackTimeout); UUID nodeId = res.creatorNodeId(); @@ -529,11 +529,11 @@ class ClientImpl extends TcpDiscoveryImpl { U.interrupt(sockWriter); U.interrupt(msgWorker); - U.interrupt(adapter.sockTimeoutWorker); + U.interrupt(spi.sockTimeoutWorker); U.join(sockWriter, log); U.join(msgWorker, log); - U.join(adapter.sockTimeoutWorker, log); + U.join(spi.sockTimeoutWorker, log); } /** {@inheritDoc} */ @@ -569,9 +569,9 @@ class ClientImpl extends TcpDiscoveryImpl { private class HeartbeatSender extends TimerTask { /** {@inheritDoc} */ @Override public void run() { - if (!adapter.getSpiContext().isStopping() && sockWriter.isOnline()) { + if (!spi.getSpiContext().isStopping() && sockWriter.isOnline()) { TcpDiscoveryClientHeartbeatMessage msg = new TcpDiscoveryClientHeartbeatMessage(getLocalNodeId(), - adapter.metricsProvider.metrics()); + spi.metricsProvider.metrics()); msg.client(true); @@ -596,7 +596,7 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ protected SocketReader() { - super(adapter.ignite().name(), "tcp-client-disco-sock-reader", log); + super(spi.ignite().name(), "tcp-client-disco-sock-reader", log); } /** @@ -640,7 +640,7 @@ class ClientImpl extends TcpDiscoveryImpl { TcpDiscoveryAbstractMessage msg; try { - msg = adapter.marsh.unmarshal(in, U.gridClassLoader()); + msg = spi.marsh.unmarshal(in, U.gridClassLoader()); } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) @@ -670,9 +670,9 @@ class ClientImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Message has been received: " + msg); - adapter.stats.onMessageReceived(msg); + spi.stats.onMessageReceived(msg); - if (adapter.ensured(msg)) + if (spi.ensured(msg)) lastMsgId = msg.id(); msgWorker.addMessage(msg); @@ -715,7 +715,7 @@ class ClientImpl extends TcpDiscoveryImpl { * */ protected SocketWriter() { - super(adapter.ignite().name(), "tcp-client-disco-sock-writer", log); + super(spi.ignite().name(), "tcp-client-disco-sock-writer", log); } /** @@ -775,11 +775,11 @@ class ClientImpl extends TcpDiscoveryImpl { } } - for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : adapter.sendMsgLsnrs) + for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs) msgLsnr.apply(msg); try { - adapter.writeToSocket(sock, msg); + spi.writeToSocket(sock, msg); msg = null; } @@ -814,7 +814,7 @@ class ClientImpl extends TcpDiscoveryImpl { * */ protected Reconnector() { - super(adapter.ignite().name(), "tcp-client-disco-msg-worker", log); + super(spi.ignite().name(), "tcp-client-disco-msg-worker", log); } /** @@ -851,7 +851,7 @@ class ClientImpl extends TcpDiscoveryImpl { // Wait for while (!isInterrupted()) { - TcpDiscoveryAbstractMessage msg = adapter.marsh.unmarshal(in, U.gridClassLoader()); + TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader()); if (msg instanceof TcpDiscoveryClientReconnectMessage) { TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg; @@ -902,13 +902,13 @@ class ClientImpl extends TcpDiscoveryImpl { * */ private MessageWorker() { - super(adapter.ignite().name(), "tcp-client-disco-msg-worker", log); + super(spi.ignite().name(), "tcp-client-disco-msg-worker", log); } /** {@inheritDoc} */ @SuppressWarnings("InfiniteLoopStatement") @Override protected void body() throws InterruptedException { - adapter.stats.onJoinStarted(); + spi.stats.onJoinStarted(); try { final Socket sock = joinTopology(false); @@ -930,7 +930,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (joinLatch.getCount() > 0) queue.add(JOIN_TIMEOUT); } - }, adapter.netTimeout); + }, spi.netTimeout); sockReader.setSocket(sock, locNode.clientRouterNodeId()); @@ -940,7 +940,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (msg == JOIN_TIMEOUT) { if (joinLatch.getCount() > 0) { joinErr = new IgniteSpiException("Join process timed out [sock=" + sock + - ", timeout=" + adapter.netTimeout + ']'); + ", timeout=" + spi.netTimeout + ']'); joinLatch.countDown(); @@ -948,7 +948,7 @@ class ClientImpl extends TcpDiscoveryImpl { } } else if (msg == SPI_STOP) { - assert adapter.getSpiContext().isStopping(); + assert spi.getSpiContext().isStopping(); if (currSock != null) { TcpDiscoveryAbstractMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); @@ -972,7 +972,7 @@ class ClientImpl extends TcpDiscoveryImpl { break; } else { - if (adapter.getSpiContext().isStopping() || segmented) + if (spi.getSpiContext().isStopping() || segmented) leaveLatch.countDown(); else { assert reconnector == null; @@ -986,7 +986,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (reconnector.isAlive()) reconnector.cancel(); } - }, adapter.netTimeout); + }, spi.netTimeout); } } } @@ -1008,11 +1008,11 @@ class ClientImpl extends TcpDiscoveryImpl { IgniteSpiException err = null; if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage) - err = adapter.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg); + err = spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg); else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage) - err = adapter.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg); + err = spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg); else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) - err = adapter.checkFailedError((TcpDiscoveryCheckFailedMessage)msg); + err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg); if (err != null) { joinErr = err; @@ -1052,7 +1052,7 @@ class ClientImpl extends TcpDiscoveryImpl { assert msg != null; assert msg.verified() || msg.senderNodeId() == null; - adapter.stats.onMessageProcessingStarted(msg); + spi.stats.onMessageProcessingStarted(msg); if (msg instanceof TcpDiscoveryNodeAddedMessage) processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); @@ -1073,14 +1073,14 @@ class ClientImpl extends TcpDiscoveryImpl { else if (msg instanceof TcpDiscoveryPingRequest) processPingRequest(); - adapter.stats.onMessageProcessingFinished(msg); + spi.stats.onMessageProcessingFinished(msg); } /** * @param msg Message. */ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { - if (adapter.getSpiContext().isStopping()) + if (spi.getSpiContext().isStopping()) return; TcpDiscoveryNode node = msg.node(); @@ -1092,7 +1092,7 @@ class ClientImpl extends TcpDiscoveryImpl { Collection<TcpDiscoveryNode> top = msg.topology(); if (top != null) { - adapter.gridStartTime = msg.gridStartTime(); + spi.gridStartTime = msg.gridStartTime(); for (TcpDiscoveryNode n : top) { if (n.order() > 0) @@ -1123,7 +1123,7 @@ class ClientImpl extends TcpDiscoveryImpl { Map<Integer, byte[]> data = msg.newNodeDiscoveryData(); if (data != null) - adapter.onExchange(newNodeId, newNodeId, data, null); + spi.onExchange(newNodeId, newNodeId, data, null); } } } @@ -1132,7 +1132,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param msg Message. */ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) { - if (adapter.getSpiContext().isStopping()) + if (spi.getSpiContext().isStopping()) return; if (getLocalNodeId().equals(msg.nodeId())) { @@ -1141,7 +1141,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (dataMap != null) { for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) - adapter.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), null); + spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), null); } locNode.setAttributes(msg.clientNodeAttributes()); @@ -1157,7 +1157,7 @@ class ClientImpl extends TcpDiscoveryImpl { joinLatch.countDown(); - adapter.stats.onJoinFinished(); + spi.stats.onJoinFinished(); } else if (log.isDebugEnabled()) log.debug("Discarding node add finished message (this message has already been processed) " + @@ -1178,8 +1178,8 @@ class ClientImpl extends TcpDiscoveryImpl { node.order(topVer); node.visible(true); - if (adapter.locNodeVer.equals(node.version())) - node.version(adapter.locNodeVer); + if (spi.locNodeVer.equals(node.version())) + node.version(spi.locNodeVer); NavigableSet<ClusterNode> top = updateTopologyHistory(topVer); @@ -1192,7 +1192,7 @@ class ClientImpl extends TcpDiscoveryImpl { notifyDiscovery(EVT_NODE_JOINED, topVer, node, top); - adapter.stats.onNodeJoined(); + spi.stats.onNodeJoined(); } } @@ -1207,7 +1207,7 @@ class ClientImpl extends TcpDiscoveryImpl { leaveLatch.countDown(); } else { - if (adapter.getSpiContext().isStopping()) + if (spi.getSpiContext().isStopping()) return; TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId()); @@ -1230,7 +1230,7 @@ class ClientImpl extends TcpDiscoveryImpl { notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top); - adapter.stats.onNodeLeft(); + spi.stats.onNodeLeft(); } } @@ -1238,7 +1238,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param msg Message. */ private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) { - if (adapter.getSpiContext().isStopping()) { + if (spi.getSpiContext().isStopping()) { if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId())) { if (leaveLatch.getCount() > 0) { log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId() @@ -1272,7 +1272,7 @@ class ClientImpl extends TcpDiscoveryImpl { notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top); - adapter.stats.onNodeFailed(); + spi.stats.onNodeFailed(); } } @@ -1280,7 +1280,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param msg Message. */ private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) { - if (adapter.getSpiContext().isStopping()) + if (spi.getSpiContext().isStopping()) return; if (getLocalNodeId().equals(msg.creatorNodeId())) { @@ -1314,7 +1314,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param msg Message. */ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) { - if (adapter.getSpiContext().isStopping()) + if (spi.getSpiContext().isStopping()) return; if (getLocalNodeId().equals(msg.creatorNodeId())) { @@ -1346,7 +1346,7 @@ class ClientImpl extends TcpDiscoveryImpl { */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { if (msg.verified() && joinLatch.getCount() == 0) { - DiscoverySpiListener lsnr = adapter.lsnr; + DiscoverySpiListener lsnr = spi.lsnr; if (lsnr != null) { UUID nodeId = msg.creatorNodeId(); @@ -1355,7 +1355,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (node != null && node.visible()) { try { - DiscoverySpiCustomMessage msgObj = msg.message(adapter.marsh); + DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh); notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj); } @@ -1437,7 +1437,7 @@ class ClientImpl extends TcpDiscoveryImpl { */ private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top, @Nullable DiscoverySpiCustomMessage data) { - DiscoverySpiListener lsnr = adapter.lsnr; + DiscoverySpiListener lsnr = spi.lsnr; if (lsnr != null) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0e192ef8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 184895b..a966363 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -230,36 +230,36 @@ class ServerImpl extends TcpDiscoveryImpl { tcpSrvr = new TcpServer(); - adapter.initLocalNode(tcpSrvr.port, true); + spi.initLocalNode(tcpSrvr.port, true); - locNode = adapter.locNode; + locNode = spi.locNode; // Start TCP server thread after local node is initialized. tcpSrvr.start(); ring.localNode(locNode); - if (adapter.ipFinder.isShared()) + if (spi.ipFinder.isShared()) registerLocalNodeAddress(); else { - if (F.isEmpty(adapter.ipFinder.getRegisteredAddresses())) + if (F.isEmpty(spi.ipFinder.getRegisteredAddresses())) throw new IgniteSpiException("Non-shared IP finder must have IP addresses specified in " + "GridTcpDiscoveryIpFinder.getRegisteredAddresses() configuration property " + "(specify list of IP addresses in configuration)."); - ipFinderHasLocAddr = adapter.ipFinderHasLocalAddress(); + ipFinderHasLocAddr = spi.ipFinderHasLocalAddress(); } - if (adapter.getStatisticsPrintFrequency() > 0 && log.isInfoEnabled()) { + if (spi.getStatisticsPrintFrequency() > 0 && log.isInfoEnabled()) { statsPrinter = new StatisticsPrinter(); statsPrinter.start(); } - adapter.stats.onJoinStarted(); + spi.stats.onJoinStarted(); joinTopology(); - adapter.stats.onJoinFinished(); + spi.stats.onJoinFinished(); hbsSnd = new HeartbeatsSender(); hbsSnd.start(); @@ -267,12 +267,12 @@ class ServerImpl extends TcpDiscoveryImpl { chkStatusSnd = new CheckStatusSender(); chkStatusSnd.start(); - if (adapter.ipFinder.isShared()) { + if (spi.ipFinder.isShared()) { ipFinderCleaner = new IpFinderCleaner(); ipFinderCleaner.start(); } - adapter.printStartInfo(); + spi.printStartInfo(); } /** @@ -283,7 +283,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Make sure address registration succeeded. while (true) { try { - adapter.ipFinder.initializeLocalAddresses(locNode.socketAddresses()); + spi.ipFinder.initializeLocalAddresses(locNode.socketAddresses()); // Success. break; @@ -341,9 +341,9 @@ class ServerImpl extends TcpDiscoveryImpl { msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id())); synchronized (mux) { - long threshold = U.currentTimeMillis() + adapter.netTimeout; + long threshold = U.currentTimeMillis() + spi.netTimeout; - long timeout = adapter.netTimeout; + long timeout = spi.netTimeout; while (spiState != LEFT && timeout > 0) { try { @@ -400,9 +400,9 @@ class ServerImpl extends TcpDiscoveryImpl { Collection<TcpDiscoveryNode> rmts = null; if (!disconnect) - adapter.printStopInfo(); + spi.printStopInfo(); else { - adapter.getSpiContext().deregisterPorts(); + spi.getSpiContext().deregisterPorts(); rmts = ring.visibleRemoteNodes(); } @@ -414,7 +414,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (rmts != null && !rmts.isEmpty()) { // This is restart/disconnection and remote nodes are not empty. // We need to fire FAIL event for each. - DiscoverySpiListener lsnr = adapter.lsnr; + DiscoverySpiListener lsnr = spi.lsnr; if (lsnr != null) { Collection<ClusterNode> processed = new HashSet<>(); @@ -438,7 +438,7 @@ class ServerImpl extends TcpDiscoveryImpl { printStatistics(); - adapter.stats.clear(); + spi.stats.clear(); synchronized (mux) { // Clear stored data. @@ -498,7 +498,7 @@ class ServerImpl extends TcpDiscoveryImpl { return false; } - for (InetSocketAddress addr : adapter.getNodeAddresses(node, U.sameMacs(locNode, node))) { + for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) { try { // ID returned by the node should be the same as ID of the parameter for ping to succeed. IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId); @@ -530,7 +530,7 @@ class ServerImpl extends TcpDiscoveryImpl { UUID locNodeId = getLocalNodeId(); - if (F.contains(adapter.locNodeAddrs, addr)) { + if (F.contains(spi.locNodeAddrs, addr)) { if (clientNodeId == null) return F.t(getLocalNodeId(), false); @@ -565,18 +565,18 @@ class ServerImpl extends TcpDiscoveryImpl { try { Socket sock = null; - for (int i = 0; i < adapter.reconCnt; i++) { + for (int i = 0; i < spi.reconCnt; i++) { try { if (addr.isUnresolved()) addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()); long tstamp = U.currentTimeMillis(); - sock = adapter.openSocket(addr); + sock = spi.openSocket(addr); - adapter.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId)); + spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId)); - TcpDiscoveryPingResponse res = adapter.readMessage(sock, null, adapter.netTimeout); + TcpDiscoveryPingResponse res = spi.readMessage(sock, null, spi.netTimeout); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -585,7 +585,7 @@ class ServerImpl extends TcpDiscoveryImpl { break; } - adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); + spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists()); @@ -638,7 +638,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { try { - msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, adapter.marsh.marshal(evt))); + msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, spi.marsh.marshal(evt))); } catch (IgniteCheckedException e) { throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); @@ -691,7 +691,7 @@ class ServerImpl extends TcpDiscoveryImpl { Map<String, Object> attrs = new HashMap<>(locNode.attributes()); attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, - adapter.ignite().configuration().getMarshaller().marshal(subj)); + spi.ignite().configuration().getMarshaller().marshal(subj)); attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); locNode.setAttributes(attrs); @@ -704,7 +704,7 @@ class ServerImpl extends TcpDiscoveryImpl { locNode.order(1); locNode.internalOrder(1); - adapter.gridStartTime = U.currentTimeMillis(); + spi.gridStartTime = U.currentTimeMillis(); locNode.visible(true); @@ -729,9 +729,9 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Join request message has been sent (waiting for coordinator response)."); synchronized (mux) { - long threshold = U.currentTimeMillis() + adapter.netTimeout; + long threshold = U.currentTimeMillis() + spi.netTimeout; - long timeout = adapter.netTimeout; + long timeout = spi.netTimeout; while (spiState == CONNECTING && timeout > 0) { try { @@ -749,15 +749,15 @@ class ServerImpl extends TcpDiscoveryImpl { if (spiState == CONNECTED) break; else if (spiState == DUPLICATE_ID) - throw adapter.duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get()); + throw spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get()); else if (spiState == AUTH_FAILED) - throw adapter.authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get()); + throw spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get()); else if (spiState == CHECK_FAILED) - throw adapter.checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get()); + throw spi.checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get()); else if (spiState == LOOPBACK_PROBLEM) { TcpDiscoveryLoopbackProblemMessage msg = (TcpDiscoveryLoopbackProblemMessage)joinRes.get(); - boolean locHostLoopback = adapter.locHost.isLoopbackAddress(); + boolean locHostLoopback = spi.locHost.isLoopbackAddress(); String firstNode = locHostLoopback ? "local" : "remote"; @@ -774,7 +774,7 @@ class ServerImpl extends TcpDiscoveryImpl { "Check remote nodes logs for possible error messages. " + "Note that large topology may require significant time to start. " + "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " + - "if getting this message on the starting nodes [networkTimeout=" + adapter.netTimeout + ']'); + "if getting this message on the starting nodes [networkTimeout=" + spi.netTimeout + ']'); } } @@ -796,13 +796,13 @@ class ServerImpl extends TcpDiscoveryImpl { @SuppressWarnings({"BusyWait"}) private boolean sendJoinRequestMessage() throws IgniteSpiException { TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode, - adapter.collectExchangeData(getLocalNodeId())); + spi.collectExchangeData(getLocalNodeId())); // Time when it has been detected, that addresses from IP finder do not respond. long noResStart = 0; while (true) { - Collection<InetSocketAddress> addrs = adapter.resolvedAddresses(); + Collection<InetSocketAddress> addrs = spi.resolvedAddresses(); if (F.isEmpty(addrs)) return false; @@ -810,7 +810,7 @@ class ServerImpl extends TcpDiscoveryImpl { boolean retry = false; Collection<Exception> errs = new ArrayList<>(); - try (SocketMultiConnector multiConnector = new SocketMultiConnector(adapter, addrs, 2)) { + try (SocketMultiConnector multiConnector = new SocketMultiConnector(spi, addrs, 2)) { GridTuple3<InetSocketAddress, Socket, Exception> tuple; while ((tuple = multiConnector.next()) != null) { @@ -897,7 +897,7 @@ class ServerImpl extends TcpDiscoveryImpl { throw new IgniteSpiException("Thread has been interrupted.", e); } } - else if (!adapter.ipFinder.isShared() && !ipFinderHasLocAddr) { + else if (!spi.ipFinder.isShared() && !ipFinderHasLocAddr) { IgniteCheckedException e = null; if (!errs.isEmpty()) { @@ -912,10 +912,10 @@ class ServerImpl extends TcpDiscoveryImpl { "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " + addrs); - if (adapter.joinTimeout > 0) { + if (spi.joinTimeout > 0) { if (noResStart == 0) noResStart = U.currentTimeMillis(); - else if (U.currentTimeMillis() - noResStart > adapter.joinTimeout) + else if (U.currentTimeMillis() - noResStart > spi.joinTimeout) throw new IgniteSpiException( "Failed to connect to any address from IP finder within join timeout " + "(make sure IP finder addresses are correct, and operating system firewalls are disabled " + @@ -952,7 +952,7 @@ class ServerImpl extends TcpDiscoveryImpl { Collection<Throwable> errs = null; - long ackTimeout0 = adapter.ackTimeout; + long ackTimeout0 = spi.ackTimeout; int connectAttempts = 1; @@ -960,7 +960,7 @@ class ServerImpl extends TcpDiscoveryImpl { UUID locNodeId = getLocalNodeId(); - for (int i = 0; i < adapter.reconCnt; i++) { + for (int i = 0; i < spi.reconCnt; i++) { // Need to set to false on each new iteration, // since remote node may leave in the middle of the first iteration. joinReqSent = false; @@ -971,14 +971,14 @@ class ServerImpl extends TcpDiscoveryImpl { long tstamp = U.currentTimeMillis(); if (sock == null) - sock = adapter.openSocket(addr); + sock = spi.openSocket(addr); openSock = true; // Handshake. - adapter.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); + spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); - TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, ackTimeout0); + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -987,14 +987,14 @@ class ServerImpl extends TcpDiscoveryImpl { break; } - adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); + spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); // Send message. tstamp = U.currentTimeMillis(); - adapter.writeToSocket(sock, msg); + spi.writeToSocket(sock, msg); - adapter.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); + spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); if (debugMode) debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + @@ -1009,7 +1009,7 @@ class ServerImpl extends TcpDiscoveryImpl { // E.g. due to class not found issue. joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; - return adapter.readReceipt(sock, ackTimeout0); + return spi.readReceipt(sock, ackTimeout0); } catch (ClassCastException e) { // This issue is rarely reproducible on AmazonEC2, but never @@ -1087,7 +1087,7 @@ class ServerImpl extends TcpDiscoveryImpl { Map<String, Object> attrs = new HashMap<>(node.getAttributes()); attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS, - adapter.marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))); + spi.marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))); node.setAttributes(attrs); } @@ -1110,7 +1110,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (credBytes == null) return null; - return adapter.marsh.unmarshal(credBytes, null); + return spi.marsh.unmarshal(credBytes, null); } catch (IgniteCheckedException e) { throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e); @@ -1123,10 +1123,10 @@ class ServerImpl extends TcpDiscoveryImpl { * maximum acknowledgement timeout, {@code false} otherwise. */ private boolean checkAckTimeout(long ackTimeout) { - if (ackTimeout > adapter.maxAckTimeout) { + if (ackTimeout > spi.maxAckTimeout) { LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " + "(consider increasing 'maxAckTimeout' configuration property) " + - "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + adapter.maxAckTimeout + ']'); + "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']'); return false; } @@ -1145,7 +1145,7 @@ class ServerImpl extends TcpDiscoveryImpl { assert type > 0; assert node != null; - DiscoverySpiListener lsnr = adapter.lsnr; + DiscoverySpiListener lsnr = spi.lsnr; TcpDiscoverySpiState spiState = spiStateCopy(); @@ -1179,7 +1179,7 @@ class ServerImpl extends TcpDiscoveryImpl { topHist.put(topVer, top); - while (topHist.size() > adapter.topHistSize) + while (topHist.size() > spi.topHistSize) topHist.remove(topHist.firstKey()); if (log.isDebugEnabled()) @@ -1200,7 +1200,7 @@ class ServerImpl extends TcpDiscoveryImpl { boolean crd = spiState == CONNECTED && locNode.equals(resolveCoordinator()); if (crd) - adapter.stats.onBecomingCoordinator(); + spi.stats.onBecomingCoordinator(); return crd; } @@ -1254,7 +1254,7 @@ class ServerImpl extends TcpDiscoveryImpl { * Prints SPI statistics. */ private void printStatistics() { - if (log.isInfoEnabled() && adapter.statsPrintFreq > 0) { + if (log.isInfoEnabled() && spi.statsPrintFreq > 0) { int failedNodesSize; int leavingNodesSize; @@ -1267,7 +1267,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryNode coord = resolveCoordinator(); - log.info("Discovery SPI statistics [statistics=" + adapter.stats + ", spiState=" + spiStateCopy() + + log.info("Discovery SPI statistics [statistics=" + spi.stats + ", spiState=" + spiStateCopy() + ", coord=" + coord + ", topSize=" + ring.allNodes().size() + ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize + @@ -1439,7 +1439,7 @@ class ServerImpl extends TcpDiscoveryImpl { b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl()); b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl()); - b.append(" Socket timeout worker: ").append(threadStatus(adapter.sockTimeoutWorker)).append(U.nl()); + b.append(" Socket timeout worker: ").append(threadStatus(spi.sockTimeoutWorker)).append(U.nl()); b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl()); b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl()); @@ -1473,7 +1473,7 @@ class ServerImpl extends TcpDiscoveryImpl { b.append(U.nl()); - b.append("Stats: ").append(adapter.stats).append(U.nl()); + b.append("Stats: ").append(spi.stats).append(U.nl()); U.quietAndInfo(log, b.toString()); } @@ -1548,9 +1548,9 @@ class ServerImpl extends TcpDiscoveryImpl { * Constructor. */ private HeartbeatsSender() { - super(adapter.ignite().name(), "tcp-disco-hb-sender", log); + super(spi.ignite().name(), "tcp-disco-hb-sender", log); - setPriority(adapter.threadPri); + setPriority(spi.threadPri); } /** {@inheritDoc} */ @@ -1576,7 +1576,7 @@ class ServerImpl extends TcpDiscoveryImpl { msgWorker.addMessage(msg); - Thread.sleep(adapter.hbFreq); + Thread.sleep(spi.hbFreq); } } } @@ -1592,9 +1592,9 @@ class ServerImpl extends TcpDiscoveryImpl { * Constructor. */ private CheckStatusSender() { - super(adapter.ignite().name(), "tcp-disco-status-check-sender", log); + super(spi.ignite().name(), "tcp-disco-status-check-sender", log); - setPriority(adapter.threadPri); + setPriority(spi.threadPri); } /** {@inheritDoc} */ @@ -1604,7 +1604,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Status check sender has been started."); // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm. - long checkTimeout = (long)adapter.maxMissedHbs * adapter.hbFreq + 50; + long checkTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50; long lastSent = 0; @@ -1656,9 +1656,9 @@ class ServerImpl extends TcpDiscoveryImpl { * Constructor. */ private IpFinderCleaner() { - super(adapter.ignite().name(), "tcp-disco-ip-finder-cleaner", log); + super(spi.ignite().name(), "tcp-disco-ip-finder-cleaner", log); - setPriority(adapter.threadPri); + setPriority(spi.threadPri); } /** {@inheritDoc} */ @@ -1668,7 +1668,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("IP finder cleaner has been started."); while (!isInterrupted()) { - Thread.sleep(adapter.ipFinderCleanFreq); + Thread.sleep(spi.ipFinderCleanFreq); if (!isLocalNodeCoordinator()) continue; @@ -1680,7 +1680,7 @@ class ServerImpl extends TcpDiscoveryImpl { return; } - if (adapter.ipFinder.isShared()) + if (spi.ipFinder.isShared()) cleanIpFinder(); } } @@ -1689,7 +1689,7 @@ class ServerImpl extends TcpDiscoveryImpl { * Cleans IP finder. */ private void cleanIpFinder() { - assert adapter.ipFinder.isShared(); + assert spi.ipFinder.isShared(); try { // Addresses that belongs to nodes in topology. @@ -1698,7 +1698,7 @@ class ServerImpl extends TcpDiscoveryImpl { ring.allNodes(), new C1<TcpDiscoveryNode, Collection<InetSocketAddress>>() { @Override public Collection<InetSocketAddress> apply(TcpDiscoveryNode node) { - return !node.isClient() ? adapter.getNodeAddresses(node) : + return !node.isClient() ? spi.getNodeAddresses(node) : Collections.<InetSocketAddress>emptyList(); } } @@ -1706,7 +1706,7 @@ class ServerImpl extends TcpDiscoveryImpl { ); // Addresses registered in IP finder. - Collection<InetSocketAddress> regAddrs = adapter.registeredAddresses(); + Collection<InetSocketAddress> regAddrs = spi.registeredAddresses(); // Remove all addresses that belong to alive nodes, leave dead-node addresses. Collection<InetSocketAddress> rmvAddrs = F.view( @@ -1742,7 +1742,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Unregister dead-nodes addresses. if (!rmvAddrs.isEmpty()) { - adapter.ipFinder.unregisterAddresses(rmvAddrs); + spi.ipFinder.unregisterAddresses(rmvAddrs); if (log.isDebugEnabled()) log.debug("Unregistered addresses from IP finder: " + rmvAddrs); @@ -1756,7 +1756,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Re-register missing addresses. if (!missingAddrs.isEmpty()) { - adapter.ipFinder.registerAddresses(missingAddrs); + spi.ipFinder.registerAddresses(missingAddrs); if (log.isDebugEnabled()) log.debug("Registered missing addresses in IP finder: " + missingAddrs); @@ -1897,7 +1897,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode) debugLog("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); - adapter.stats.onMessageProcessingStarted(msg); + spi.stats.onMessageProcessingStarted(msg); if (msg instanceof TcpDiscoveryJoinRequestMessage) processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg); @@ -1938,7 +1938,7 @@ class ServerImpl extends TcpDiscoveryImpl { else assert false : "Unknown message type: " + msg.getClass().getSimpleName(); - adapter.stats.onMessageProcessingFinished(msg); + spi.stats.onMessageProcessingFinished(msg); } /** @@ -1952,7 +1952,7 @@ class ServerImpl extends TcpDiscoveryImpl { assert ring.hasRemoteNodes(); - for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : adapter.sendMsgLsnrs) + for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs) msgLsnr.apply(msg); if (redirectToClients(msg)) { @@ -1964,9 +1964,9 @@ class ServerImpl extends TcpDiscoveryImpl { try { if (marshalledMsg == null) - marshalledMsg = adapter.marsh.marshal(msg); + marshalledMsg = spi.marsh.marshal(msg); - msgClone = adapter.marsh.unmarshal(marshalledMsg, null); + msgClone = spi.marsh.unmarshal(marshalledMsg, null); } catch (IgniteCheckedException e) { U.error(log, "Failed to marshal message: " + msg, e); @@ -2043,8 +2043,8 @@ class ServerImpl extends TcpDiscoveryImpl { List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses()); - addr: for (InetSocketAddress addr : adapter.getNodeAddresses(next, sameHost)) { - long ackTimeout0 = adapter.ackTimeout; + addr: for (InetSocketAddress addr : spi.getNodeAddresses(next, sameHost)) { + long ackTimeout0 = spi.ackTimeout; if (locNodeAddrs.contains(addr)){ if (log.isDebugEnabled()) @@ -2054,7 +2054,7 @@ class ServerImpl extends TcpDiscoveryImpl { continue; } - for (int i = 0; i < adapter.reconCnt; i++) { + for (int i = 0; i < spi.reconCnt; i++) { if (sock == null) { nextNodeExists = false; @@ -2066,14 +2066,14 @@ class ServerImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - sock = adapter.openSocket(addr); + sock = spi.openSocket(addr); openSock = true; // Handshake. writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); - TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, ackTimeout0); + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -2086,7 +2086,7 @@ class ServerImpl extends TcpDiscoveryImpl { break; } - adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); + spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); UUID nextId = res.creatorNodeId(); @@ -2214,9 +2214,9 @@ class ServerImpl extends TcpDiscoveryImpl { clearNodeAddedMessage(pendingMsg); } - adapter.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp); + spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp); - int res = adapter.readReceipt(sock, ackTimeout0); + int res = spi.readReceipt(sock, ackTimeout0); if (log.isDebugEnabled()) log.debug("Pending message has been sent to next node [msg=" + msg.id() + @@ -2237,9 +2237,9 @@ class ServerImpl extends TcpDiscoveryImpl { writeToSocket(sock, msg); - adapter.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); + spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); - int res = adapter.readReceipt(sock, ackTimeout0); + int res = spi.readReceipt(sock, ackTimeout0); if (log.isDebugEnabled()) log.debug("Message has been sent to next node [msg=" + msg + @@ -2382,10 +2382,10 @@ class ServerImpl extends TcpDiscoveryImpl { private void registerPendingMessage(TcpDiscoveryAbstractMessage msg) { assert msg != null; - if (adapter.ensured(msg)) { + if (spi.ensured(msg)) { pendingMsgs.add(msg); - adapter.stats.onPendingMessageRegistered(); + spi.stats.onPendingMessageRegistered(); if (log.isDebugEnabled()) log.debug("Pending message has been registered: " + msg.id()); @@ -2411,7 +2411,7 @@ class ServerImpl extends TcpDiscoveryImpl { // This check is performed by the node joining node is connected to, but not by coordinator // because loopback problem message is sent directly to the joining node which may be unavailable // if coordinator resides on another host. - if (adapter.locHost.isLoopbackAddress() != rmtHostLoopback) { + if (spi.locHost.isLoopbackAddress() != rmtHostLoopback) { String firstNode = rmtHostLoopback ? "remote" : "local"; String secondNode = rmtHostLoopback ? "local" : "remote"; @@ -2510,7 +2510,7 @@ class ServerImpl extends TcpDiscoveryImpl { try { trySendMessageDirectly(node, new TcpDiscoveryAuthFailedMessage(locNodeId, - adapter.locHost)); + spi.locHost)); } catch (IgniteSpiException e) { if (log.isDebugEnabled()) @@ -2541,7 +2541,7 @@ class ServerImpl extends TcpDiscoveryImpl { try { trySendMessageDirectly(node, new TcpDiscoveryAuthFailedMessage(locNodeId, - adapter.locHost)); + spi.locHost)); } catch (IgniteSpiException e) { if (log.isDebugEnabled()) @@ -2557,7 +2557,7 @@ class ServerImpl extends TcpDiscoveryImpl { Map<String, Object> attrs = new HashMap<>(node.getAttributes()); attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, - adapter.ignite().configuration().getMarshaller().marshal(subj)); + spi.ignite().configuration().getMarshaller().marshal(subj)); node.setAttributes(attrs); } @@ -2578,7 +2578,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - IgniteNodeValidationResult err = adapter.getSpiContext().validateNode(node); + IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node); if (err != null) { boolean ping = node.id().equals(err.nodeId()) ? pingNode(node) : pingNode(err.nodeId()); @@ -2666,7 +2666,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Internal order has been assigned to node: " + node); TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId, - node, msg.discoveryData(), adapter.gridStartTime); + node, msg.discoveryData(), spi.gridStartTime); nodeAddedMsg.client(msg.client()); @@ -2711,7 +2711,7 @@ class ServerImpl extends TcpDiscoveryImpl { IgniteSpiException ex = null; - for (InetSocketAddress addr : adapter.getNodeAddresses(node, U.sameMacs(locNode, node))) { + for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) { try { sendMessageDirectly(msg, addr, null); @@ -2760,7 +2760,7 @@ class ServerImpl extends TcpDiscoveryImpl { assert node.isClient(); node.clientRouterNodeId(msg.routerNodeId()); - node.aliveCheck(adapter.maxMissedClientHbs); + node.aliveCheck(spi.maxMissedClientHbs); if (isLocalNodeCoordinator()) { Collection<TcpDiscoveryAbstractMessage> pending = @@ -2826,7 +2826,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (isLocalNodeCoordinator()) { if (msg.verified()) { - adapter.stats.onRingMessageReceived(msg); + spi.stats.onRingMessageReceived(msg); TcpDiscoveryNodeAddFinishedMessage addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(locNodeId, node.id()); @@ -2882,7 +2882,7 @@ class ServerImpl extends TcpDiscoveryImpl { else { SecurityContext subj = nodeAuth.authenticateNode(node, cred); - SecurityContext coordSubj = adapter.ignite().configuration().getMarshaller().unmarshal( + SecurityContext coordSubj = spi.ignite().configuration().getMarshaller().unmarshal( node.<byte[]>attribute(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT), U.gridClassLoader()); @@ -2911,7 +2911,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (authFailed) { try { trySendMessageDirectly(node, new TcpDiscoveryAuthFailedMessage(locNodeId, - adapter.locHost)); + spi.locHost)); } catch (IgniteSpiException e) { if (log.isDebugEnabled()) @@ -2929,7 +2929,7 @@ class ServerImpl extends TcpDiscoveryImpl { } if (msg.client()) - node.aliveCheck(adapter.maxMissedClientHbs); + node.aliveCheck(spi.maxMissedClientHbs); boolean topChanged = ring.add(node); @@ -2939,9 +2939,9 @@ class ServerImpl extends TcpDiscoveryImpl { Map<Integer, byte[]> data = msg.newNodeDiscoveryData(); if (data != null) - adapter.onExchange(node.id(), node.id(), data, U.gridClassLoader()); + spi.onExchange(node.id(), node.id(), data, U.gridClassLoader()); - msg.addDiscoveryData(locNodeId, adapter.collectExchangeData(node.id())); + msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id())); } if (log.isDebugEnabled()) @@ -2959,7 +2959,7 @@ class ServerImpl extends TcpDiscoveryImpl { Collection<TcpDiscoveryNode> top = msg.topology(); if (top != null && !top.isEmpty()) { - adapter.gridStartTime = msg.gridStartTime(); + spi.gridStartTime = msg.gridStartTime(); for (TcpDiscoveryNode n : top) { // Make all preceding nodes and local node visible. @@ -3011,7 +3011,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Notify outside of synchronized block. if (dataMap != null) { for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) - adapter.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader()); + spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader()); } } @@ -3050,7 +3050,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (locNodeCoord) { if (msg.verified()) { - adapter.stats.onRingMessageReceived(msg); + spi.stats.onRingMessageReceived(msg); addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); @@ -3088,15 +3088,15 @@ class ServerImpl extends TcpDiscoveryImpl { } if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) { - adapter.stats.onNodeJoined(); + spi.stats.onNodeJoined(); // Make sure that node with greater order will never get EVT_NODE_JOINED // on node with less order. assert node.internalOrder() > locNode.internalOrder() : "Invalid order [node=" + node + ", locNode=" + locNode + ", msg=" + msg + ", ring=" + ring + ']'; - if (adapter.locNodeVer.equals(node.version())) - node.version(adapter.locNodeVer); + if (spi.locNodeVer.equals(node.version())) + node.version(spi.locNodeVer); if (!locNodeCoord) { boolean b = ring.topologyVersion(topVer); @@ -3113,8 +3113,8 @@ class ServerImpl extends TcpDiscoveryImpl { notifyDiscovery(EVT_NODE_JOINED, topVer, node); try { - if (adapter.ipFinder.isShared() && locNodeCoord) - adapter.ipFinder.registerAddresses(node.socketAddresses()); + if (spi.ipFinder.isShared() && locNodeCoord) + spi.ipFinder.registerAddresses(node.socketAddresses()); } catch (IgniteSpiException e) { if (log.isDebugEnabled()) @@ -3172,9 +3172,9 @@ class ServerImpl extends TcpDiscoveryImpl { } if (msg.verified() || !ring.hasRemoteNodes() || msg.senderNodeId() != null) { - if (adapter.ipFinder.isShared() && !ring.hasRemoteNodes()) { + if (spi.ipFinder.isShared() && !ring.hasRemoteNodes()) { try { - adapter.ipFinder.unregisterAddresses(locNode.socketAddresses()); + spi.ipFinder.unregisterAddresses(locNode.socketAddresses()); } catch (IgniteSpiException e) { U.error(log, "Failed to unregister local node address from IP finder.", e); @@ -3222,7 +3222,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (locNodeCoord) { if (msg.verified()) { - adapter.stats.onRingMessageReceived(msg); + spi.stats.onRingMessageReceived(msg); addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); @@ -3297,7 +3297,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - adapter.stats.onNodeLeft(); + spi.stats.onNodeLeft(); notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode); @@ -3392,7 +3392,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (locNodeCoord) { if (msg.verified()) { - adapter.stats.onRingMessageReceived(msg); + spi.stats.onRingMessageReceived(msg); addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); @@ -3442,7 +3442,7 @@ class ServerImpl extends TcpDiscoveryImpl { notifyDiscovery(EVT_NODE_FAILED, topVer, node); - adapter.stats.onNodeFailed(); + spi.stats.onNodeFailed(); } if (ring.hasRemoteNodes()) @@ -3535,7 +3535,7 @@ class ServerImpl extends TcpDiscoveryImpl { } if (locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null && - U.currentTimeMillis() - locNode.lastUpdateTime() < adapter.hbFreq) { + U.currentTimeMillis() - locNode.lastUpdateTime() < spi.hbFreq) { if (log.isDebugEnabled()) log.debug("Status check message discarded (local node receives updates)."); @@ -3643,8 +3643,8 @@ class ServerImpl extends TcpDiscoveryImpl { if ((locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null || !hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) { // Message is on its first ring or just created on coordinator. - msg.setMetrics(locNodeId, adapter.metricsProvider.metrics()); - msg.setCacheMetrics(locNodeId, adapter.metricsProvider.cacheMetrics()); + msg.setMetrics(locNodeId, spi.metricsProvider.metrics()); + msg.setCacheMetrics(locNodeId, spi.metricsProvider.cacheMetrics()); for (Map.Entry<UUID, ClientMessageWorker> e : clientMsgWorkers.entrySet()) { UUID nodeId = e.getKey(); @@ -3665,7 +3665,7 @@ class ServerImpl extends TcpDiscoveryImpl { for (TcpDiscoveryNode clientNode : ring.clientNodes()) { if (clientNode.visible()) { if (clientNodeIds.contains(clientNode.id())) - clientNode.aliveCheck(adapter.maxMissedClientHbs); + clientNode.aliveCheck(spi.maxMissedClientHbs); else { int aliveCheck = clientNode.decrementAliveCheck(); @@ -3815,12 +3815,12 @@ class ServerImpl extends TcpDiscoveryImpl { if (sndNext && ring.hasRemoteNodes()) sendMessageAcrossRing(msg); else { - adapter.stats.onRingMessageReceived(msg); + spi.stats.onRingMessageReceived(msg); DiscoverySpiCustomMessage msgObj = null; try { - msgObj = msg.message(adapter.marsh); + msgObj = msg.message(spi.marsh); } catch (Throwable e) { U.error(log, "Failed to unmarshal discovery custom message.", e); @@ -3832,7 +3832,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (nextMsg != null) { try { addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg, - adapter.marsh.marshal(nextMsg))); + spi.marsh.marshal(nextMsg))); } catch (IgniteCheckedException e) { U.error(log, "Failed to marshal discovery custom message.", e); @@ -3856,7 +3856,7 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Custom message. */ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) { - DiscoverySpiListener lsnr = adapter.lsnr; + DiscoverySpiListener lsnr = spi.lsnr; TcpDiscoverySpiState spiState = spiStateCopy(); @@ -3873,7 +3873,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (node != null) { try { - DiscoverySpiCustomMessage msgObj = msg.message(adapter.marsh); + DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh); lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, msg.topologyVersion(), @@ -3883,7 +3883,7 @@ class ServerImpl extends TcpDiscoveryImpl { msgObj); if (msgObj.isMutable()) - msg.message(msgObj, adapter.marsh.marshal(msgObj)); + msg.message(msgObj, spi.marsh.marshal(msgObj)); } catch (Throwable e) { U.error(log, "Failed to unmarshal discovery custom message.", e); @@ -3912,35 +3912,35 @@ class ServerImpl extends TcpDiscoveryImpl { * @throws IgniteSpiException In case of error. */ TcpServer() throws IgniteSpiException { - super(adapter.ignite().name(), "tcp-disco-srvr", log); + super(spi.ignite().name(), "tcp-disco-srvr", log); - setPriority(adapter.threadPri); + setPriority(spi.threadPri); - for (port = adapter.locPort; port < adapter.locPort + adapter.locPortRange; port++) { + for (port = spi.locPort; port < spi.locPort + spi.locPortRange; port++) { try { - srvrSock = new ServerSocket(port, 0, adapter.locHost); + srvrSock = new ServerSocket(port, 0, spi.locHost); break; } catch (IOException e) { - if (port < adapter.locPort + adapter.locPortRange - 1) { + if (port < spi.locPort + spi.locPortRange - 1) { if (log.isDebugEnabled()) log.debug("Failed to bind to local port (will try next port within range) " + - "[port=" + port + ", localHost=" + adapter.locHost + ']'); + "[port=" + port + ", localHost=" + spi.locHost + ']'); onException("Failed to bind to local port. " + - "[port=" + port + ", localHost=" + adapter.locHost + ']', e); + "[port=" + port + ", localHost=" + spi.locHost + ']', e); } else { throw new IgniteSpiException("Failed to bind TCP server socket (possibly all ports in range " + - "are in use) [firstPort=" + adapter.locPort + ", lastPort=" + (adapter.locPort + adapter.locPortRange - 1) + - ", addr=" + adapter.locHost + ']', e); + "are in use) [firstPort=" + spi.locPort + ", lastPort=" + (spi.locPort + spi.locPortRange - 1) + + ", addr=" + spi.locHost + ']', e); } } } if (log.isInfoEnabled()) - log.info("Successfully bound to TCP port [port=" + port + ", localHost=" + adapter.locHost + ']'); + log.info("Successfully bound to TCP port [port=" + port + ", localHost=" + spi.locHost + ']'); } /** {@inheritDoc} */ @@ -3962,7 +3962,7 @@ class ServerImpl extends TcpDiscoveryImpl { reader.start(); } - adapter.stats.onServerSocketInitialized(U.currentTimeMillis() - tstamp); + spi.stats.onServerSocketInitialized(U.currentTimeMillis() - tstamp); } } catch (IOException e) { @@ -4007,13 +4007,13 @@ class ServerImpl extends TcpDiscoveryImpl { * @param sock Socket to read data from. */ SocketReader(Socket sock) { - super(adapter.ignite().name(), "tcp-disco-sock-reader", log); + super(spi.ignite().name(), "tcp-disco-sock-reader", log); this.sock = sock; - setPriority(adapter.threadPri); + setPriority(spi.threadPri); - adapter.stats.onSocketReaderCreated(); + spi.stats.onSocketReaderCreated(); } /** {@inheritDoc} */ @@ -4032,9 +4032,9 @@ class ServerImpl extends TcpDiscoveryImpl { int timeout = sock.getSoTimeout(); - sock.setSoTimeout((int)adapter.netTimeout); + sock.setSoTimeout((int)spi.netTimeout); - for (IgniteInClosure<Socket> connLsnr : adapter.incomeConnLsnrs) + for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs) connLsnr.apply(sock); in = new BufferedInputStream(sock.getInputStream()); @@ -4077,11 +4077,11 @@ class ServerImpl extends TcpDiscoveryImpl { // Restore timeout. sock.setSoTimeout(timeout); - TcpDiscoveryAbstractMessage msg = adapter.readMessage(sock, in, adapter.netTimeout); + TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout); // Ping. if (msg instanceof TcpDiscoveryPingRequest) { - if (!adapter.isNodeStopping0()) { + if (!spi.isNodeStopping0()) { TcpDiscoveryPingRequest req = (TcpDiscoveryPingRequest)msg; TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId); @@ -4093,7 +4093,7 @@ class ServerImpl extends TcpDiscoveryImpl { res.clientExists(clientWorker.ping()); } - adapter.writeToSocket(sock, res); + spi.writeToSocket(sock, res); } else if (log.isDebugEnabled()) log.debug("Ignore ping request, node is stopping."); @@ -4111,7 +4111,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryHandshakeResponse res = new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder()); - adapter.writeToSocket(sock, res); + spi.writeToSocket(sock, res); // It can happen if a remote node is stopped and it has a loopback address in the list of addresses, // the local node sends a handshake request message on the loopback address, so we get here. @@ -4169,7 +4169,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (e.hasCause(SocketTimeoutException.class)) LT.warn(log, null, "Socket operation timed out on handshake " + "(consider increasing 'networkTimeout' configuration property) " + - "[netTimeout=" + adapter.netTimeout + ']'); + "[netTimeout=" + spi.netTimeout + ']'); else if (e.hasCause(ClassNotFoundException.class)) LT.warn(log, null, "Failed to read message due to ClassNotFoundException " + @@ -4187,14 +4187,14 @@ class ServerImpl extends TcpDiscoveryImpl { while (!isInterrupted()) { try { - TcpDiscoveryAbstractMessage msg = adapter.marsh.unmarshal(in, U.gridClassLoader()); + TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader()); msg.senderNodeId(nodeId); if (log.isDebugEnabled()) log.debug("Message has been received: " + msg); - adapter.stats.onMessageReceived(msg); + spi.stats.onMessageReceived(msg); if (debugMode && recordable(msg)) debugLog("Message has been received: " + msg); @@ -4217,14 +4217,14 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState state = spiStateCopy(); if (state == CONNECTED) { - adapter.writeToSocket(sock, RES_OK); + spi.writeToSocket(sock, RES_OK); msgWorker.addMessage(msg); continue; } else { - adapter.writeToSocket(sock, RES_CONTINUE_JOIN); + spi.writeToSocket(sock, RES_CONTINUE_JOIN); break; } @@ -4232,7 +4232,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryDuplicateIdMessage) { // Send receipt back. - adapter.writeToSocket(sock, RES_OK); + spi.writeToSocket(sock, RES_OK); boolean ignored = false; @@ -4261,7 +4261,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryAuthFailedMessage) { // Send receipt back. - adapter.writeToSocket(sock, RES_OK); + spi.writeToSocket(sock, RES_OK); boolean ignored = false; @@ -4290,7 +4290,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryCheckFailedMessage) { // Send receipt back. - adapter.writeToSocket(sock, RES_OK); + spi.writeToSocket(sock, RES_OK); boolean ignored = false; @@ -4319,7 +4319,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) { // Send receipt back. - adapter.writeToSocket(sock, RES_OK); + spi.writeToSocket(sock, RES_OK); boolean ignored = false; @@ -4361,7 +4361,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Send receipt back. if (clientMsgWrk == null) - adapter.writeToSocket(sock, RES_OK); + spi.writeToSocket(sock, RES_OK); } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) @@ -4461,7 +4461,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState state = spiStateCopy(); if (state == CONNECTED) { - adapter.writeToSocket(sock, RES_OK); + spi.writeToSocket(sock, RES_OK); if (log.isDebugEnabled()) log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']'); @@ -4473,7 +4473,7 @@ class ServerImpl extends TcpDiscoveryImpl { return true; } else { - adapter.stats.onMessageProcessingStarted(msg); + spi.stats.onMessageProcessingStarted(msg); Integer res; @@ -4492,14 +4492,14 @@ class ServerImpl extends TcpDiscoveryImpl { // Local node is stopping. Remote node should try next one. res = RES_CONTINUE_JOIN; - adapter.writeToSocket(sock, res); + spi.writeToSocket(sock, res); if (log.isDebugEnabled()) log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']'); fromAddrs.addAll(msg.node().socketAddresses()); - adapter.stats.onMessageProcessingFinished(msg); + spi.stats.onMessageProcessingFinished(msg); return false; } @@ -4522,7 +4522,7 @@ class ServerImpl extends TcpDiscoveryImpl { readers.remove(this); } - adapter.stats.onSocketReaderRemoved(); + spi.stats.onSocketReaderRemoved(); } /** {@inheritDoc} */ @@ -4539,13 +4539,13 @@ class ServerImpl extends TcpDiscoveryImpl { * Constructor. */ StatisticsPrinter() { - super(adapter.ignite().name(), "tcp-disco-stats-printer", log); + super(spi.ignite().name(), "tcp-disco-stats-printer", log); - assert adapter.statsPrintFreq > 0; + assert spi.statsPrintFreq > 0; assert log.isInfoEnabled(); - setPriority(adapter.threadPri); + setPriority(spi.threadPri); } /** {@inheritDoc} */ @@ -4555,7 +4555,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Statistics printer has been started."); while (!isInterrupted()) { - Thread.sleep(adapter.statsPrintFreq); + Thread.sleep(spi.statsPrintFreq); printStatistics(); } @@ -4650,7 +4650,7 @@ class ServerImpl extends TcpDiscoveryImpl { * */ public boolean ping() throws InterruptedException { - if (adapter.isNodeStopping0()) + if (spi.isNodeStopping0()) return false; GridFutureAdapter<Boolean> fut; @@ -4675,7 +4675,7 @@ class ServerImpl extends TcpDiscoveryImpl { } try { - return fut.get(adapter.ackTimeout, TimeUnit.MILLISECONDS); + return fut.get(spi.ackTimeout, TimeUnit.MILLISECONDS); } catch (IgniteInterruptedCheckedException ignored) { throw new InterruptedException(); @@ -4718,9 +4718,9 @@ class ServerImpl extends TcpDiscoveryImpl { * @param name Thread name. */ protected MessageWorkerAdapter(String name) { - super(adapter.ignite().name(), name, log); + super(spi.ignite().name(), name, log); - setPriority(adapter.threadPri); + setPriority(spi.threadPri); } /** {@inheritDoc} */ @@ -4787,7 +4787,7 @@ class ServerImpl extends TcpDiscoveryImpl { throws IOException, IgniteCheckedException { bout.reset(); - adapter.writeToSocket(sock, msg, bout); + spi.writeToSocket(sock, msg, bout); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0e192ef8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index 8dad92a..4836911 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -40,7 +40,7 @@ abstract class TcpDiscoveryImpl { protected static final int RES_WAIT = 200; /** */ - protected final TcpDiscoverySpi adapter; + protected final TcpDiscoverySpi spi; /** */ protected final IgniteLogger log; @@ -49,19 +49,19 @@ abstract class TcpDiscoveryImpl { protected TcpDiscoveryNode locNode; /** - * @param adapter Adapter. + * @param spi Adapter. */ - TcpDiscoveryImpl(TcpDiscoverySpi adapter) { - this.adapter = adapter; + TcpDiscoveryImpl(TcpDiscoverySpi spi) { + this.spi = spi; - log = adapter.log; + log = spi.log; } /** * */ public UUID getLocalNodeId() { - return adapter.getLocalNodeId(); + return spi.getLocalNodeId(); } /** @@ -69,7 +69,7 @@ abstract class TcpDiscoveryImpl { * @param e Exception. */ protected void onException(String msg, Exception e){ - adapter.getExceptionRegistry().onException(msg, e); + spi.getExceptionRegistry().onException(msg, e); } /**