ignite-621 fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/65f34729 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/65f34729 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/65f34729 Branch: refs/heads/ignite-30 Commit: 65f347299c8699ec8b43dad8ec63a40758677dd2 Parents: 2444811 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Tue Apr 7 17:42:04 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Tue Apr 7 17:42:04 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 24 +++++- .../communication/tcp/TcpCommunicationSpi.java | 14 ++-- .../discovery/tcp/TcpClientDiscoverySpi.java | 47 ++++++------ .../spi/discovery/tcp/TcpDiscoverySpi.java | 78 ++++++++++---------- .../discovery/tcp/TcpDiscoverySpiAdapter.java | 8 +- .../spi/swapspace/file/FileSwapSpaceSpi.java | 4 +- .../GridAbstractCommunicationSelfTest.java | 3 +- ...mmunicationSpiConcurrentConnectSelfTest.java | 3 +- ...cpCommunicationSpiMultithreadedSelfTest.java | 3 +- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 3 +- ...GridTcpCommunicationSpiRecoverySelfTest.java | 2 +- 11 files changed, 104 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65f34729/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 65303db..f198210 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -54,10 +54,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement @LoggerResource private IgniteLogger log; - /** Ignite instance */ - @IgniteInstanceResource + /** Ignite instance. */ protected Ignite ignite; + /** Local node id. */ + protected UUID nodeId; + + /** Grid instance name. */ + protected String gridName; + /** SPI name. */ private String name; @@ -105,7 +110,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** {@inheritDoc} */ @Override public UUID getLocalNodeId() { - return ignite.configuration().getNodeId(); + return nodeId; } /** {@inheritDoc} */ @@ -189,6 +194,19 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement } /** + * Inject ignite instance. + */ + @IgniteInstanceResource + protected void injectResources(Ignite ignite){ + this.ignite = ignite; + + if (ignite != null) { + nodeId = ignite.configuration().getNodeId(); + gridName = ignite.name(); + } + } + + /** * Method to be called in the beginning of onContextDestroyed() method. */ protected void onContextDestroyed0() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65f34729/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 613ff64..6cd439b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -591,9 +591,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Local port range. */ private int locPortRange = DFLT_PORT_RANGE; - /** Grid name. */ - private String gridName; - /** Allocate direct buffer or heap buffer. */ private boolean directBuf = true; @@ -742,10 +739,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ @IgniteInstanceResource protected void injectResources(Ignite ignite) { + super.injectResources(ignite); + if (ignite != null) { setAddressResolver(ignite.configuration().getAddressResolver()); setLocalAddress(ignite.configuration().getLocalHost()); - gridName = ignite.name(); } } @@ -1189,7 +1187,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { - nodeIdMsg = new NodeIdMessage(ignite.configuration().getNodeId()); + nodeIdMsg = new NodeIdMessage(getLocalNodeId()); assertParameter(locPort > 1023, "locPort > 1023"); assertParameter(locPort <= 0xffff, "locPort < 0xffff"); @@ -1557,7 +1555,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isTraceEnabled()) log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']'); - UUID locNodeId = ignite.configuration().getNodeId(); + UUID locNodeId = getLocalNodeId(); if (node.id().equals(locNodeId)) notifyListener(locNodeId, msg, NOOP); @@ -1957,7 +1955,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ch.write(ByteBuffer.wrap(U.IGNITE_HEADER)); if (recovery != null) { - HandshakeMessage msg = new HandshakeMessage(ignite.configuration().getNodeId(), + HandshakeMessage msg = new HandshakeMessage(getLocalNodeId(), recovery.incrementConnectCount(), recovery.receivedCount()); @@ -2630,7 +2628,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter out.flush(); if (log.isDebugEnabled()) - log.debug("Sent local node ID [locNodeId=" + ignite.configuration().getNodeId() + ", rmtNodeId=" + log.debug("Sent local node ID [locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']'); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65f34729/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 48125bf..bf69efb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -255,7 +255,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } locNode = new TcpDiscoveryNode( - ignite.configuration().getNodeId(), + getLocalNodeId(), addrs.get1(), addrs.get2(), 0, @@ -295,7 +295,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp leaveLatch = new CountDownLatch(1); try { - TcpDiscoveryNodeLeftMessage msg = new TcpDiscoveryNodeLeftMessage(ignite.configuration().getNodeId()); + TcpDiscoveryNodeLeftMessage msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); msg.client(true); @@ -344,7 +344,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp /** {@inheritDoc} */ @Nullable @Override public ClusterNode getNode(UUID nodeId) { - if (ignite.configuration().getNodeId().equals(nodeId)) + if (getLocalNodeId().equals(nodeId)) return locNode; TcpDiscoveryNode node = rmtNodes.get(nodeId); @@ -356,7 +356,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp @Override public boolean pingNode(UUID nodeId) { assert nodeId != null; - if (nodeId.equals(ignite.configuration().getNodeId())) + if (nodeId.equals(getLocalNodeId())) return true; TcpDiscoveryNode node = rmtNodes.get(nodeId); @@ -427,7 +427,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp locNode.clientRouterNodeId(rmtNodeId); TcpDiscoveryAbstractMessage msg = recon ? - new TcpDiscoveryClientReconnectMessage(ignite.configuration().getNodeId(), rmtNodeId, + new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId) : new TcpDiscoveryJoinRequestMessage(locNode, null); @@ -547,7 +547,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp Socket sock = openSocket(addr); - TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(ignite.configuration().getNodeId()); + TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId()); req.client(true); @@ -558,7 +558,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp UUID nodeId = res.creatorNodeId(); assert nodeId != null; - assert !ignite.configuration().getNodeId().equals(nodeId); + assert !getLocalNodeId().equals(nodeId); return F.t(sock, nodeId); } @@ -567,7 +567,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp * FOR TEST PURPOSE ONLY! */ void simulateNodeFailure() { - U.warn(log, "Simulating client node failure: " + ignite.configuration().getNodeId()); + U.warn(log, "Simulating client node failure: " + getLocalNodeId()); U.closeQuiet(sock); @@ -591,7 +591,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp /** */ protected DisconnectHandler() { - super(ignite.name(), "tcp-client-disco-disconnect-hnd", log); + super(gridName, "tcp-client-disco-disconnect-hnd", log); } /** {@inheritDoc} */ @@ -644,7 +644,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp /** */ protected HeartbeatSender() { - super(ignite.name(), "tcp-client-disco-heartbeat-sender", log); + super(gridName, "tcp-client-disco-heartbeat-sender", log); } /** {@inheritDoc} */ @@ -662,8 +662,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp while (!isInterrupted()) { U.sleep(hbFreq); - TcpDiscoveryHeartbeatMessage msg = - new TcpDiscoveryHeartbeatMessage(ignite.configuration().getNodeId()); + TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getLocalNodeId()); msg.client(true); @@ -692,7 +691,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp * @param msgWrk Message worker. */ protected SocketReader(UUID nodeId, MessageWorker msgWrk) { - super(ignite.name(), "tcp-client-disco-sock-reader", log); + super(gridName, "tcp-client-disco-sock-reader", log); assert nodeId != null; assert msgWrk != null; @@ -760,7 +759,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp catch (IgniteCheckedException e) { if (log.isDebugEnabled()) U.error(log, "Failed to read message [sock=" + sock0 + ", " + - "locNodeId=" + ignite.configuration().getNodeId() + ", rmtNodeId=" + nodeId + ']', e); + "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + nodeId + ']', e); IOException ioEx = X.cause(e, IOException.class); @@ -775,14 +774,14 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp "[rmtNodeId=" + nodeId + ", err=" + clsNotFoundEx.getMessage() + ']'); else LT.error(log, e, "Failed to read message [sock=" + sock0 + ", locNodeId=" + - ignite.configuration().getNodeId() + ", rmtNodeId=" + nodeId + ']'); + getLocalNodeId() + ", rmtNodeId=" + nodeId + ']'); } } } catch (IOException e) { if (log.isDebugEnabled()) U.error(log, "Connection failed [sock=" + sock0 + ", locNodeId=" + - ignite.configuration().getNodeId() + ", rmtNodeId=" + nodeId + ']', e); + getLocalNodeId() + ", rmtNodeId=" + nodeId + ']', e); } finally { U.closeQuiet(sock0); @@ -877,7 +876,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp UUID newNodeId = node.id(); - if (ignite.configuration().getNodeId().equals(newNodeId)) { + if (getLocalNodeId().equals(newNodeId)) { if (joinLatch.getCount() > 0) { Collection<TcpDiscoveryNode> top = msg.topology(); @@ -933,7 +932,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp if (leaveLatch != null) return; - if (ignite.configuration().getNodeId().equals(msg.nodeId())) { + if (getLocalNodeId().equals(msg.nodeId())) { if (joinLatch.getCount() > 0) { long topVer = msg.topologyVersion(); @@ -986,7 +985,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp * @param msg Message. */ private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) { - if (ignite.configuration().getNodeId().equals(msg.creatorNodeId())) { + if (getLocalNodeId().equals(msg.creatorNodeId())) { if (log.isDebugEnabled()) log.debug("Received node left message for local node: " + msg); @@ -1031,7 +1030,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp if (leaveLatch != null) return; - if (!ignite.configuration().getNodeId().equals(msg.creatorNodeId())) { + if (!getLocalNodeId().equals(msg.creatorNodeId())) { TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId()); if (node == null) { @@ -1063,12 +1062,12 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp if (leaveLatch != null) return; - if (ignite.configuration().getNodeId().equals(msg.creatorNodeId())) { + if (getLocalNodeId().equals(msg.creatorNodeId())) { if (msg.senderNodeId() == null) { Socket sock0 = sock; if (sock0 != null) { - msg.setMetrics(ignite.configuration().getNodeId(), metricsProvider.metrics()); + msg.setMetrics(getLocalNodeId(), metricsProvider.metrics()); try { writeToSocket(sock0, msg); @@ -1117,7 +1116,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp if (leaveLatch != null) return; - if (ignite.configuration().getNodeId().equals(msg.creatorNodeId())) { + if (getLocalNodeId().equals(msg.creatorNodeId())) { if (msg.success()) { pending = true; @@ -1158,7 +1157,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp assert nodeId != null; assert metrics != null; - TcpDiscoveryNode node = nodeId.equals(ignite.configuration().getNodeId()) ? locNode : rmtNodes.get(nodeId); + TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId); if (node != null && node.visible()) { node.setMetrics(metrics); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65f34729/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 1cee279..bad8837 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -626,7 +626,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov @Nullable @Override public ClusterNode getNode(UUID nodeId) { assert nodeId != null; - UUID locNodeId0 = ignite.configuration().getNodeId(); + UUID locNodeId0 = getLocalNodeId(); if (locNodeId0 != null && locNodeId0.equals(nodeId)) // Return local node directly. @@ -703,7 +703,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } locNode = new TcpDiscoveryNode( - ignite.configuration().getNodeId(), + getLocalNodeId(), addrs.get1(), addrs.get2(), tcpSrvr.port, @@ -856,7 +856,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (hbFreq < 2000) U.warn(log, "Heartbeat frequency is too high (at least 2000 ms recommended): " + hbFreq); - registerMBean(ignite.name(), this, TcpDiscoverySpiMBean.class); + registerMBean(gridName, this, TcpDiscoverySpiMBean.class); if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) { TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder); @@ -926,7 +926,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (msgWorker != null && msgWorker.isAlive() && !disconnect) { // Send node left message only if it is final stop. - msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(ignite.configuration().getNodeId())); + msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(getLocalNodeId())); synchronized (mux) { long threshold = U.currentTimeMillis() + netTimeout; @@ -1088,7 +1088,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov @Override public boolean pingNode(UUID nodeId) { assert nodeId != null; - if (nodeId == ignite.configuration().getNodeId()) + if (nodeId == getLocalNodeId()) return true; TcpDiscoveryNode node = ring.node(nodeId); @@ -1116,7 +1116,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov private boolean pingNode(TcpDiscoveryNode node) { assert node != null; - if (node.id().equals(ignite.configuration().getNodeId())) + if (node.id().equals(getLocalNodeId())) return true; UUID clientNodeId = null; @@ -1160,10 +1160,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov throws IgniteCheckedException { assert addr != null; - UUID locNodeId = ignite.configuration().getNodeId(); + UUID locNodeId = getLocalNodeId(); if (F.contains(locNodeAddrs, addr)) - return F.t(ignite.configuration().getNodeId(), false); + return F.t(getLocalNodeId(), false); GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>(); @@ -1388,7 +1388,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov @SuppressWarnings({"BusyWait"}) private boolean sendJoinRequestMessage() throws IgniteSpiException { TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode, - exchange.collect(ignite.configuration().getNodeId())); + exchange.collect(getLocalNodeId())); // Time when it has been detected, that addresses from IP finder do not respond. long noResStart = 0; @@ -1550,7 +1550,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov boolean joinReqSent = false; - UUID locNodeId = ignite.configuration().getNodeId(); + UUID locNodeId = getLocalNodeId(); for (int i = 0; i < reconCnt; i++) { // Need to set to false on each new iteration, @@ -1985,7 +1985,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * This method is intended for test purposes only. */ void simulateNodeFailure() { - U.warn(log, "Simulating node failure: " + ignite.configuration().getNodeId()); + U.warn(log, "Simulating node failure: " + getLocalNodeId()); U.interrupt(tcpSrvr); U.join(tcpSrvr, log); @@ -2034,7 +2034,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } if (next != null) - msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(ignite.configuration().getNodeId(), next.id(), + msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), next.id(), next.internalOrder())); } @@ -2085,7 +2085,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl()); b.append(">>>").append(U.nl()); - b.append("Local node ID: ").append(ignite.configuration().getNodeId()).append(U.nl()).append(U.nl()); + b.append("Local node ID: ").append(getLocalNodeId()).append(U.nl()).append(U.nl()); b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl()); b.append("SPI state: ").append(spiState).append(U.nl()).append(U.nl()); @@ -2141,7 +2141,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov assert debugMode; String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) + - '[' + Thread.currentThread().getName() + "][" + ignite.configuration().getNodeId() + + '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() + "-" + locNode.internalOrder() + "] " + msg; @@ -2205,7 +2205,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * Constructor. */ private HeartbeatsSender() { - super(ignite.name(), "tcp-disco-hb-sender", log); + super(gridName, "tcp-disco-hb-sender", log); setPriority(threadPri); } @@ -2227,9 +2227,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov return; } - TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(ignite.configuration().getNodeId()); + TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getLocalNodeId()); - msg.verify(ignite.configuration().getNodeId()); + msg.verify(getLocalNodeId()); msgWorker.addMessage(msg); @@ -2249,7 +2249,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * Constructor. */ private CheckStatusSender() { - super(ignite.name(), "tcp-disco-status-check-sender", log); + super(gridName, "tcp-disco-status-check-sender", log); setPriority(threadPri); } @@ -2313,7 +2313,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * Constructor. */ private IpFinderCleaner() { - super(ignite.name(), "tcp-disco-ip-finder-cleaner", log); + super(gridName, "tcp-disco-ip-finder-cleaner", log); setPriority(threadPri); } @@ -2579,7 +2579,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); if (wrk != null) { - msg.verify(ignite.configuration().getNodeId()); + msg.verify(getLocalNodeId()); wrk.addMessage(msg); } @@ -2638,7 +2638,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov boolean searchNext = true; - UUID locNodeId = ignite.configuration().getNodeId(); + UUID locNodeId = getLocalNodeId(); while (true) { if (searchNext) { @@ -3035,7 +3035,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov TcpDiscoveryNode node = msg.node(); - UUID locNodeId = ignite.configuration().getNodeId(); + UUID locNodeId = getLocalNodeId(); if (!msg.client()) { boolean rmtHostLoopback = node.socketAddresses().size() == 1 && @@ -3354,7 +3354,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param msg Client reconnect message. */ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) { - UUID locNodeId = ignite.configuration().getNodeId(); + UUID locNodeId = getLocalNodeId(); boolean isLocalNodeRouter = locNodeId.equals(msg.routerNodeId()); @@ -3436,7 +3436,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov return; } - UUID locNodeId = ignite.configuration().getNodeId(); + UUID locNodeId = getLocalNodeId(); if (isLocalNodeCoordinator()) { if (msg.verified()) { @@ -3650,7 +3650,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov boolean locNodeCoord = isLocalNodeCoordinator(); - UUID locNodeId = ignite.configuration().getNodeId(); + UUID locNodeId = getLocalNodeId(); if (locNodeCoord) { if (msg.verified()) { @@ -3759,7 +3759,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) { assert msg != null; - UUID locNodeId = ignite.configuration().getNodeId(); + UUID locNodeId = getLocalNodeId(); UUID leavingNodeId = msg.creatorNodeId(); @@ -4004,7 +4004,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov boolean locNodeCoord = isLocalNodeCoordinator(); - UUID locNodeId = ignite.configuration().getNodeId(); + UUID locNodeId = getLocalNodeId(); if (locNodeCoord) { if (msg.verified()) { @@ -4093,7 +4093,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov private void processStatusCheckMessage(TcpDiscoveryStatusCheckMessage msg) { assert msg != null; - UUID locNodeId = ignite.configuration().getNodeId(); + UUID locNodeId = getLocalNodeId(); if (msg.failedNodeId() != null) { if (locNodeId.equals(msg.failedNodeId())) { @@ -4215,7 +4215,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) { assert msg != null; - UUID locNodeId = ignite.configuration().getNodeId(); + UUID locNodeId = getLocalNodeId(); if (ring.node(msg.creatorNodeId()) == null) { if (log.isDebugEnabled()) @@ -4346,9 +4346,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov assert msgId != null; if (isLocalNodeCoordinator()) { - if (!ignite.configuration().getNodeId().equals(msg.verifierNodeId())) + if (!getLocalNodeId().equals(msg.verifierNodeId())) // Message is not verified or verified by former coordinator. - msg.verify(ignite.configuration().getNodeId()); + msg.verify(getLocalNodeId()); else // Discard the message. return; @@ -4424,7 +4424,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @throws IgniteSpiException In case of error. */ TcpServer() throws IgniteSpiException { - super(ignite.name(), "tcp-disco-srvr", log); + super(gridName, "tcp-disco-srvr", log); setPriority(threadPri); @@ -4522,7 +4522,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param sock Socket to read data from. */ SocketReader(Socket sock) { - super(ignite.name(), "tcp-disco-sock-reader", log); + super(gridName, "tcp-disco-sock-reader", log); this.sock = sock; @@ -4533,7 +4533,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { - UUID locNodeId = ignite.configuration().getNodeId(); + UUID locNodeId = getLocalNodeId(); try { InputStream in; @@ -4995,7 +4995,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (state == CONNECTING) { if (noResAddrs.contains(rmtAddr) || - ignite.configuration().getNodeId().compareTo(msg.creatorNodeId()) < 0) + getLocalNodeId().compareTo(msg.creatorNodeId()) < 0) // Remote node node has not responded to join request or loses UUID race. res = RES_WAIT; else @@ -5053,7 +5053,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * Constructor. */ StatisticsPrinter() { - super(ignite.name(), "tcp-disco-stats-printer", log); + super(gridName, "tcp-disco-stats-printer", log); assert statsPrintFreq > 0; @@ -5130,7 +5130,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" - + ignite.configuration().getNodeId() + ", rmtNodeId=" + nodeId + ", msg=" + msg + ']'); + + getLocalNodeId() + ", rmtNodeId=" + nodeId + ", msg=" + msg + ']'); try { prepareNodeAddedMessage(msg, nodeId, null, null); @@ -5144,10 +5144,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov catch (IgniteCheckedException | IOException e) { if (log.isDebugEnabled()) U.error(log, "Client connection failed [sock=" + sock + ", locNodeId=" - + ignite.configuration().getNodeId() + ", rmtNodeId=" + nodeId + ", msg=" + msg + ']', e); + + getLocalNodeId() + ", rmtNodeId=" + nodeId + ", msg=" + msg + ']', e); onException("Client connection failed [sock=" + sock + ", locNodeId=" - + ignite.configuration().getNodeId() + ", rmtNodeId=" + nodeId + ", msg=" + msg + ']', e); + + getLocalNodeId() + ", rmtNodeId=" + nodeId + ", msg=" + msg + ']', e); U.interrupt(clientMsgWorkers.remove(nodeId)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65f34729/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java index 1d9559e..e949846 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java @@ -152,6 +152,8 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov */ @IgniteInstanceResource protected void injectResources(Ignite ignite) { + super.injectResources(ignite); + // Inject resource. if (ignite != null) setLocalAddress(ignite.configuration().getLocalHost()); @@ -767,7 +769,7 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov * */ SocketTimeoutWorker() { - super(ignite.name(), "tcp-disco-sock-timeout-worker", log); + super(gridName, "tcp-disco-sock-timeout-worker", log); setPriority(threadPri); } @@ -935,7 +937,7 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov * @param name Thread name. */ protected MessageWorkerAdapter(String name) { - super(ignite.name(), name, log); + super(gridName, name, log); setPriority(threadPri); } @@ -943,7 +945,7 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { if (log.isDebugEnabled()) - log.debug("Message worker started [locNodeId=" + ignite.configuration().getNodeId() + ']'); + log.debug("Message worker started [locNodeId=" + getLocalNodeId() + ']'); while (!isInterrupted()) { TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65f34729/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java index 1d2670e..5df37d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java @@ -257,7 +257,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi, registerMBean(gridName, this, FileSwapSpaceSpiMBean.class); - String path = baseDir + File.separator + gridName + File.separator + ignite.configuration().getNodeId(); + String path = baseDir + File.separator + gridName + File.separator + getLocalNodeId(); try { dir = U.resolveWorkDirectory(path, true); @@ -1433,8 +1433,6 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi, final Object mux = new Object(); - String gridName = ignite.name(); - writer = new IgniteSpiThread(gridName, "Swap writer: " + name, log) { @Override protected void body() throws InterruptedException { while (!isInterrupted()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65f34729/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java index 0492dd1..bfed977 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.*; import org.apache.ignite.testframework.junits.spi.*; @@ -280,7 +281,7 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS for (int i = 0; i < getSpiCount(); i++) { CommunicationSpi<Message> spi = getSpi(i); - GridTestUtils.setFieldValue(spi, "gridName", "grid-" + i); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i); IgniteTestResources rsrcs = new IgniteTestResources(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65f34729/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index 2ccb89d..c038ee7 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.*; @@ -317,7 +318,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic for (int i = 0; i < SPI_CNT; i++) { CommunicationSpi<Message> spi = createSpi(); - GridTestUtils.setFieldValue(spi, "gridName", "grid-" + i); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i); IgniteTestResources rsrcs = new IgniteTestResources(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65f34729/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java index 692de62..e7ae957 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.*; @@ -439,7 +440,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac for (int i = 0; i < getSpiCount(); i++) { CommunicationSpi<Message> spi = newCommunicationSpi(); - GridTestUtils.setFieldValue(spi, "gridName", "grid-" + i); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i); IgniteTestResources rsrcs = new IgniteTestResources(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65f34729/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index f1d0847..c0f0b11 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.*; @@ -343,7 +344,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS for (int i = 0; i < SPI_CNT; i++) { TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit); - GridTestUtils.setFieldValue(spi, "gridName", "grid-" + i); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i); IgniteTestResources rsrcs = new IgniteTestResources(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65f34729/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java index 1b941f1..7463388 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java @@ -632,7 +632,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> for (int i = 0; i < SPI_CNT; i++) { TcpCommunicationSpi spi = getSpi(i); - GridTestUtils.setFieldValue(spi, "gridName", "grid-" + i); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i); IgniteTestResources rsrcs = new IgniteTestResources();