# ignite-537
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d25627c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d25627c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d25627c9 Branch: refs/heads/ignite-721 Commit: d25627c9af1038c7c7c6a573fa0b39ccd48dceb1 Parents: 49996e7 Author: sboikov <sboi...@gridgain.com> Authored: Fri Apr 10 16:16:29 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Apr 10 16:16:29 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/GridComponent.java | 5 +- .../ignite/internal/GridPluginComponent.java | 5 +- .../internal/managers/GridManagerAdapter.java | 4 +- .../discovery/GridDiscoveryManager.java | 10 +- .../processors/GridProcessorAdapter.java | 5 +- .../processors/cache/GridCacheProcessor.java | 4 +- .../continuous/GridContinuousProcessor.java | 4 +- .../plugin/IgnitePluginProcessor.java | 13 +- .../apache/ignite/plugin/PluginProvider.java | 5 +- .../spi/discovery/DiscoverySpiDataExchange.java | 5 +- .../discovery/tcp/TcpClientDiscoverySpi.java | 4 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 364 ++++++++----------- .../discovery/tcp/TcpDiscoverySpiAdapter.java | 4 +- .../tcp/messages/TcpDiscoveryClassRequest.java | 55 +++ .../tcp/messages/TcpDiscoveryClassResponse.java | 76 ++++ .../TcpDiscoveryCustomEventMessage.java | 7 + .../messages/TcpDiscoveryGetClassRequest.java | 48 --- .../messages/TcpDiscoveryGetClassResponse.java | 70 ---- .../discovery/AbstractDiscoverySelfTest.java | 4 +- .../tcp/TcpDiscoverySpiStartStopSelfTest.java | 5 +- .../junits/spi/GridSpiAbstractTest.java | 4 +- .../startcache/CacheConfigP2PStartClient.java | 38 +- 22 files changed, 363 insertions(+), 376 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index 9f08d6e..fb227cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; +import java.io.*; import java.util.*; /** @@ -81,7 +82,7 @@ public interface GridComponent { * @return Discovery data object or {@code null} if there is nothing * to send for this component. */ - @Nullable public Object collectDiscoveryData(UUID nodeId); + @Nullable public Serializable collectDiscoveryData(UUID nodeId); /** * Receives discovery data object from remote nodes (called @@ -90,7 +91,7 @@ public interface GridComponent { * @param rmtNodeId Remote node ID for which data is provided. * @param data Discovery data object or {@code null} if nothing was */ - public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Object data); + public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data); /** * Prints memory statistics (sizes of internal structures, etc.). http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java index 305a7e3..b438bc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java @@ -23,6 +23,7 @@ import org.apache.ignite.plugin.*; import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; +import java.io.*; import java.util.*; /** @@ -73,12 +74,12 @@ public class GridPluginComponent implements GridComponent { } /** {@inheritDoc} */ - @Nullable @Override public Object collectDiscoveryData(UUID nodeId) { + @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) { return null; } /** {@inheritDoc} */ - @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Object data) { + @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index a0521c8..cb91313 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -571,12 +571,12 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan } /** {@inheritDoc} */ - @Override @Nullable public Object collectDiscoveryData(UUID nodeId) { + @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) { return null; } /** {@inheritDoc} */ - @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Object data) { + @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 299b3c2..c073596 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -419,13 +419,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { }); spi.setDataExchange(new DiscoverySpiDataExchange() { - @Override public Map<Integer, Object> collect(UUID nodeId) { + @Override public Map<Integer, Serializable> collect(UUID nodeId) { assert nodeId != null; - Map<Integer, Object> data = new HashMap<>(); + Map<Integer, Serializable> data = new HashMap<>(); for (GridComponent comp : ctx.components()) { - Object compData = comp.collectDiscoveryData(nodeId); + Serializable compData = comp.collectDiscoveryData(nodeId); if (compData != null) { assert comp.discoveryDataType() != null; @@ -437,8 +437,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return data; } - @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Object> data) { - for (Map.Entry<Integer, Object> e : data.entrySet()) { + @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data) { + for (Map.Entry<Integer, Serializable> e : data.entrySet()) { GridComponent comp = null; for (GridComponent c : ctx.components()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java index 39ccd7a..a84c48a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; +import java.io.*; import java.util.*; /** @@ -66,12 +67,12 @@ public abstract class GridProcessorAdapter implements GridProcessor { } /** {@inheritDoc} */ - @Override @Nullable public Object collectDiscoveryData(UUID nodeId) { + @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) { return null; } /** {@inheritDoc} */ - @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Object data) { + @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index cc90cd3..2859061 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1587,7 +1587,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Nullable @Override public Object collectDiscoveryData(UUID nodeId) { + @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) { // Collect dynamically started caches to a single object. Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size()); @@ -1626,7 +1626,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Object data) { + @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) { if (data instanceof DynamicCacheChangeBatch) { DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 54da8e6..0d76ad4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -318,7 +318,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override @Nullable public Object collectDiscoveryData(UUID nodeId) { + @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) { if (!nodeId.equals(ctx.localNodeId())) { pendingLock.lock(); @@ -349,7 +349,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onDiscoveryDataReceived(UUID nodeId, UUID rmtNodeId, Object obj) { + @Override public void onDiscoveryDataReceived(UUID nodeId, UUID rmtNodeId, Serializable obj) { DiscoveryData data = (DiscoveryData)obj; if (!ctx.isDaemon() && data != null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java index 0b9ca5e..8f2cc51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.*; import org.jetbrains.annotations.*; +import java.io.*; import java.lang.reflect.*; import java.security.*; import java.util.*; @@ -153,11 +154,11 @@ public class IgnitePluginProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Nullable @Override public Object collectDiscoveryData(UUID nodeId) { - Map<String, Object> discData = null; + @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) { + HashMap<String, Serializable> discData = null; for (Map.Entry<String, PluginProvider> e : plugins.entrySet()) { - Object data = e.getValue().provideDiscoveryData(nodeId); + Serializable data = e.getValue().provideDiscoveryData(nodeId); if (data != null) { if (discData == null) @@ -171,11 +172,11 @@ public class IgnitePluginProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onDiscoveryDataReceived(UUID nodeId, UUID rmtNodeId, Object data) { - Map<String, Object> discData = (Map<String, Object>)data; + @Override public void onDiscoveryDataReceived(UUID nodeId, UUID rmtNodeId, Serializable data) { + Map<String, Serializable> discData = (Map<String, Serializable>)data; if (discData != null) { - for (Map.Entry<String, Object> e : discData.entrySet()) { + for (Map.Entry<String, Serializable> e : discData.entrySet()) { PluginProvider provider = plugins.get(e.getKey()); if (provider != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java b/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java index d2aedff..17bbc36 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.jetbrains.annotations.*; +import java.io.*; import java.util.*; /** @@ -101,7 +102,7 @@ public interface PluginProvider<C extends PluginConfiguration> { * @return Discovery data object or {@code null} if there is nothing * to send for this component. */ - @Nullable public Object provideDiscoveryData(UUID nodeId); + @Nullable public Serializable provideDiscoveryData(UUID nodeId); /** * Receives plugin discovery data object from remote nodes (called @@ -112,7 +113,7 @@ public interface PluginProvider<C extends PluginConfiguration> { * @param data Discovery data object or {@code null} if nothing was * sent for this component. */ - public void receiveDiscoveryData(UUID nodeId, Object data); + public void receiveDiscoveryData(UUID nodeId, Serializable data); /** * Validates that new node can join grid topology, this method is called on coordinator http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java index 29f6b5f..46d6716 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery; +import java.io.*; import java.util.*; /** @@ -33,7 +34,7 @@ public interface DiscoverySpiDataExchange { * @param joiningNodeId ID of new node that joins topology. * @return Collection of discovery data objects from different components. */ - public Map<Integer, Object> collect(UUID joiningNodeId); + public Map<Integer, Serializable> collect(UUID joiningNodeId); /** * Notifies discovery manager about data received from remote node. @@ -41,5 +42,5 @@ public interface DiscoverySpiDataExchange { * @param joiningNodeId Remote node ID. * @param data Collection of discovery data objects from different components. */ - public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Object> data); + public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 2ab8e8f..312b940 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -900,7 +900,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp Map<UUID, Map<Integer, byte[]>> dataMap = msg.oldNodesDiscoveryData(); - if (dataMap != null) { + if (!locNode.isDaemon() && dataMap != null) { for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) onExchange(newNodeId, entry.getKey(), entry.getValue(), null); } @@ -924,7 +924,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp Map<Integer, byte[]> data = msg.newNodeDiscoveryData(); - if (data != null) + if (!locNode.isDaemon() && data != null) onExchange(newNodeId, newNodeId, data, null); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 94fbfee..21f1c97 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -239,9 +239,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private CheckStatusSender chkStatusSnd; - /** IP finder cleaner. */ + /** IP finder and p2p loaders cleaner. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private IpFinderCleaner ipFinderCleaner; + private DiscoveryCleaner cleaner; /** Statistics printer thread. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") @@ -289,15 +289,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private ConcurrentLinkedDeque<String> debugLog; - /** Thread periodically closing unused p2p class loader connections. */ - private DeploymentClassLoadersCleaner p2pLdrCleaner; - /** Class loaders for event data unmarshalling. */ private ConcurrentMap<UUID, DiscoveryDeploymentClassLoader> p2pLdrs = new ConcurrentHashMap<>(); - /** Class loader for local data unmarshalling. */ - private ClassLoader locLdr = U.gridClassLoader(); - /** {@inheritDoc} */ @IgniteInstanceResource @Override public void injectResources(Ignite ignite) { @@ -770,14 +764,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov chkStatusSnd = new CheckStatusSender(); chkStatusSnd.start(); - if (ipFinder.isShared()) { - ipFinderCleaner = new IpFinderCleaner(); - ipFinderCleaner.start(); - } - - if (ignite.configuration().isPeerClassLoadingEnabled()) { - p2pLdrCleaner = new DeploymentClassLoadersCleaner(); - p2pLdrCleaner.start(); + if (ipFinder.isShared() || ignite.configuration().isPeerClassLoadingEnabled()) { + cleaner = new DiscoveryCleaner(); + cleaner.start(); } if (log.isDebugEnabled() && !restart) @@ -992,8 +981,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov U.interrupt(chkStatusSnd); U.join(chkStatusSnd, log); - U.interrupt(ipFinderCleaner); - U.join(ipFinderCleaner, log); + U.interrupt(cleaner); + U.join(cleaner, log); U.interrupt(msgWorker); U.join(msgWorker, log); @@ -1004,9 +993,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov U.interrupt(statsPrinter); U.join(statsPrinter, log); - U.interrupt(p2pLdrCleaner); - U.join(p2pLdrCleaner, log); - if (ipFinder != null) ipFinder.close(); @@ -1265,16 +1251,16 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** {@inheritDoc} */ @Override public void sendCustomEvent(Serializable evt) { - byte[] msgBytes; - try { + byte[] msgBytes; + msgBytes = marsh.marshal(evt); + + msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), msgBytes)); } catch (IgniteCheckedException e) { throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); } - - msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), msgBytes)); } /** @@ -2024,8 +2010,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov U.interrupt(chkStatusSnd); U.join(chkStatusSnd, log); - U.interrupt(ipFinderCleaner); - U.join(ipFinderCleaner, log); + U.interrupt(cleaner); + U.join(cleaner, log); Collection<SocketReader> tmp; @@ -2123,7 +2109,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl()); b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl()); b.append(" Socket timeout worker: ").append(threadStatus(sockTimeoutWorker)).append(U.nl()); - b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl()); + b.append(" Cleaner: ").append(threadStatus(cleaner)).append(U.nl()); b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl()); b.append(U.nl()); @@ -2224,12 +2210,12 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param req Get class request. * @return Get class response. */ - private TcpDiscoveryGetClassResponse processGetClassRequest(TcpDiscoveryGetClassRequest req) { + private TcpDiscoveryClassResponse processGetClassRequest(TcpDiscoveryClassRequest req) { assert !F.isEmpty(req.className()) : req; String rsrc = U.classNameToResourceName(req.className()); - InputStream in = locLdr.getResourceAsStream(rsrc); + InputStream in = U.gridClassLoader().getResourceAsStream(rsrc); byte[] clsBytes = null; String err = null; @@ -2243,7 +2229,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov clsBytes = bytes.entireArray(); } catch (IOException e) { - err = "Failed to load class '" + req.className() + "' due IO error: " + e; + err = "Failed to load class due IO error [cls=" + req.className() + ", err=" + e + ']'; U.error(log, err, e); } @@ -2255,22 +2241,121 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to find requested class: " + req.className()); - err = "Class '" + req.className() + "' not found."; + err = "Failed to find requested class: " + req.className(); } - TcpDiscoveryGetClassResponse res; + TcpDiscoveryClassResponse res; if (clsBytes != null) - res = new TcpDiscoveryGetClassResponse(getLocalNodeId(), clsBytes); + res = new TcpDiscoveryClassResponse(getLocalNodeId(), clsBytes); else { assert err != null; - res = new TcpDiscoveryGetClassResponse(getLocalNodeId(), err); + res = new TcpDiscoveryClassResponse(getLocalNodeId(), err); } return res; } + /** + * @param node Node created event. + * @return Class loader for custom event unmarshalling. + */ + @Nullable protected ClassLoader customMessageClassLoader(TcpDiscoveryNode node) { + assert ignite != null; + + if (!ignite.configuration().isPeerClassLoadingEnabled()) + return null; + + if (node.id().equals(getLocalNodeId()) || node.isClient()) + return U.gridClassLoader(); + + DiscoveryDeploymentClassLoader ldr = p2pLdrs.get(node.id()); + + if (ldr == null) + ldr = F.addIfAbsent(p2pLdrs, node.id(), new DiscoveryDeploymentClassLoader(node)); + + return ldr; + } + + /** + * @param joiningNode Joining node. + * @param nodeId Remote node provided data. + * @return Class loader for exchange data unmarshalling. + */ + @Nullable protected ClassLoader exchangeClassLoader(TcpDiscoveryNode joiningNode, UUID nodeId) { + assert joiningNode != null; + assert ignite != null; + + if (!ignite.configuration().isPeerClassLoadingEnabled()) + return null; + + if (nodeId.equals(getLocalNodeId())) + return U.gridClassLoader(); + + TcpDiscoveryNode node; + + if (joiningNode.id().equals(nodeId)) + node = joiningNode; + else { + node = ring.node(nodeId); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Node provided exchange data left, will use local class loader " + + "for exchange data [nodeId=" + nodeId + ']'); + + return U.gridClassLoader(); + } + } + + if (node.isClient()) // Do not support loading from client nodes. + return U.gridClassLoader(); + + DiscoveryDeploymentClassLoader ldr = p2pLdrs.get(nodeId); + + if (ldr == null) + ldr = F.addIfAbsent(p2pLdrs, nodeId, new DiscoveryDeploymentClassLoader(node)); + + return ldr; + } + + /** + * @param nodeId Node ID. + * @return Marshalled exchange data. + * @throws IgniteSpiException If failed. + */ + private Map<Integer, byte[]> collectExchangeData(UUID nodeId) throws IgniteSpiException { + Map<Integer, Serializable> data = exchange.collect(nodeId); + + Map<Integer, byte[]> data0 = U.newHashMap(data.size()); + + for (Map.Entry<Integer, Serializable> entry : data.entrySet()) { + try { + byte[] bytes = marsh.marshal(entry.getValue()); + + data0.put(entry.getKey(), bytes); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal discovery data " + + "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e); + + throw new IgniteSpiException("Failed to marshal discovery data.", e); + } + } + + return data0; + } + + /** + * @param msg Message. + * @param nodeId Node ID. + */ + private static void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) { + msg.removeMetrics(nodeId); + msg.removeCacheMetrics(nodeId); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoverySpi.class, this); @@ -2381,18 +2466,19 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** - * Thread that cleans IP finder and keeps it in the correct state, unregistering - * addresses of the nodes that has left the topology. + * Thread that periodically tries to release p2p class loaders connections, cleans + * IP finder and keeps it in the correct state, unregistering addresses of the nodes + * that has left the topology. * <p> - * This thread should run only on coordinator node and will clean IP finder + * IP finder cleaner should run only on coordinator node and will clean IP finder * if and only if {@link TcpDiscoveryIpFinder#isShared()} is {@code true}. */ - private class IpFinderCleaner extends IgniteSpiThread { + private class DiscoveryCleaner extends IgniteSpiThread { /** * Constructor. */ - private IpFinderCleaner() { - super(gridName, "tcp-disco-ip-finder-cleaner", log); + private DiscoveryCleaner() { + super(gridName, "tcp-disco-cleaner", log); setPriority(threadPri); } @@ -2401,11 +2487,21 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov @SuppressWarnings("BusyWait") @Override protected void body() throws InterruptedException { if (log.isDebugEnabled()) - log.debug("IP finder cleaner has been started."); + log.debug("Tcp discovery cleaner has been started."); while (!isInterrupted()) { Thread.sleep(ipFinderCleanFreq); + for (DiscoveryDeploymentClassLoader ldr : p2pLdrs.values()) { + if (ring.node(ldr.nodeId()) == null) { + ldr.onNodeLeft(); + + p2pLdrs.remove(ldr.nodeId(), ldr); + } + else + ldr.closeConnectionIfNotUsed(); + } + if (!isLocalNodeCoordinator()) continue; @@ -2505,41 +2601,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** - * - */ - private class DeploymentClassLoadersCleaner extends IgniteSpiThread { - /** - * Constructor. - */ - private DeploymentClassLoadersCleaner() { - super(gridName, "tcp-disco-p2p-ldr-cleaner", log); - - setPriority(threadPri); - } - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Deployment class loader cleaner has been started."); - - while (!isInterrupted()) { - Thread.sleep(5000); - - for (DiscoveryDeploymentClassLoader ldr : p2pLdrs.values()) { - if (ring.node(ldr.nodeId()) == null) { - ldr.onNodeLeft(); - - p2pLdrs.remove(ldr.nodeId(), ldr); - } - else - ldr.closeConnectionIfNotUsed(); - } - } - } - } - - /** * Pending messages container. */ private static class PendingMessages { @@ -3656,7 +3717,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov Map<Integer, byte[]> data = msg.newNodeDiscoveryData(); - if (data != null) + if (!locNode.isDaemon() && data != null) onExchange(node.id(), node.id(), data, exchangeClassLoader(node, node.id())); msg.addDiscoveryData(locNodeId, collectExchangeData(node.id())); @@ -3727,7 +3788,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } // Notify outside of synchronized block. - if (dataMap != null) { + if (!locNode.isDaemon() && dataMap != null) { for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) { onExchange(node.id(), entry.getKey(), @@ -4529,7 +4590,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion()); - if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) { + if (!locNode.isDaemon() && lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) { assert msg.messageBytes() != null; TcpDiscoveryNode node = ring.node(msg.creatorNodeId()); @@ -5024,8 +5085,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov continue; } - else if (msg instanceof TcpDiscoveryGetClassRequest) { - TcpDiscoveryGetClassResponse res = processGetClassRequest((TcpDiscoveryGetClassRequest)msg); + else if (msg instanceof TcpDiscoveryClassRequest) { + TcpDiscoveryClassResponse res = processGetClassRequest((TcpDiscoveryClassRequest)msg); writeToSocket(sock, res); @@ -5324,104 +5385,11 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** - * @param node Node created event. - * @return Class loader for custom event unmarshalling. - */ - @Nullable protected ClassLoader customMessageClassLoader(TcpDiscoveryNode node) { - assert ignite != null; - - if (!ignite.configuration().isPeerClassLoadingEnabled()) - return null; - - if (node.id().equals(getLocalNodeId())) - return locLdr; - - DiscoveryDeploymentClassLoader ldr = p2pLdrs.get(node.id()); - - if (ldr == null) - ldr = F.addIfAbsent(p2pLdrs, node.id(), new DiscoveryDeploymentClassLoader(node)); - - return ldr; - } - - /** - * @param joiningNode Joining node. - * @param nodeId Remote node provided data. - * @return Class loader for exchange data unmarshalling. - */ - @Nullable protected ClassLoader exchangeClassLoader(TcpDiscoveryNode joiningNode, UUID nodeId) { - assert joiningNode != null; - assert ignite != null; - - if (!ignite.configuration().isPeerClassLoadingEnabled()) - return null; - - if (nodeId.equals(getLocalNodeId())) - return locLdr; - - TcpDiscoveryNode node; - - if (joiningNode.id().equals(nodeId)) - node = joiningNode; - else { - node = ring.node(nodeId); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Node provided exchange data left, will use local class loader " + - "for exchange data [nodeId=" + nodeId + ']'); - - return locLdr; - } - } - - if (node.isClient()) // Do not support loading from client nodes. - return locLdr; - - DiscoveryDeploymentClassLoader ldr = p2pLdrs.get(nodeId); - - if (ldr == null) - ldr = F.addIfAbsent(p2pLdrs, nodeId, new DiscoveryDeploymentClassLoader(node)); - - return ldr; - } - - /** - * @param nodeId Node ID. - * @return Marshalled exchange data. - * @throws IgniteSpiException If failed. - */ - private Map<Integer, byte[]> collectExchangeData(UUID nodeId) throws IgniteSpiException { - Map<Integer, Object> data = exchange.collect(nodeId); - - Map<Integer, byte[]> data0 = U.newHashMap(data.size()); - - for (Map.Entry<Integer, Object> entry : data.entrySet()) { - try { - byte[] bytes = marsh.marshal(entry.getValue()); - - data0.put(entry.getKey(), bytes); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal discovery data " + - "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e); - - throw new IgniteSpiException("Failed to marshal discovery data.", e); - } - } - - return data0; - } - - /** * */ private class DiscoveryDeploymentClassLoader extends ClassLoader { /** */ - private final UUID nodeId; - - /** */ - private volatile TcpDiscoveryNode node; + private final TcpDiscoveryNode node; /** */ private Socket sock; @@ -5437,15 +5405,13 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov assert !node.id().equals(getLocalNodeId()); this.node = node; - - nodeId = node.id(); } /** * @return Target node ID. */ UUID nodeId() { - return nodeId; + return node.id(); } /** @@ -5457,14 +5423,12 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov try { if (sock != null) { if (log.isDebugEnabled()) - log.debug("Closing deployment class loader connection on node left [node=" + nodeId + ']'); + log.debug("Closing deployment class loader connection on node left [node=" + node.id() + ']'); U.closeQuiet(sock); sock = null; } - - node = null; } finally { lock.writeLock().unlock(); @@ -5479,7 +5443,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov try { if (sock != null) { if (log.isDebugEnabled()) - log.debug("Closing idle deployment class loader connection [node=" + nodeId + ']'); + log.debug("Closing idle deployment class loader connection [node=" + node.id() + ']'); U.closeQuiet(sock); @@ -5494,18 +5458,14 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** {@inheritDoc} */ @Override protected Class<?> findClass(String name) throws ClassNotFoundException { - if (node == null) - throw new ClassNotFoundException("Failed to load class, peer node left " + - "[cls=" +name + ", node=" + nodeId + ']'); - lock.readLock().lock(); try { - TcpDiscoveryGetClassResponse res = requestClass(name); + TcpDiscoveryClassResponse res = requestClass(name); if (res == null) throw new ClassNotFoundException("Failed to load class, can not connect to peer node " + - "[cls=" + name + ", node=" + nodeId + ']'); + "[cls=" + name + ", node=" + node.id() + ']'); if (res.error() != null) throw new ClassNotFoundException(res.error()); @@ -5523,17 +5483,12 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param name Class name. * @return Class response or {@code null} if failed to connect. */ - @Nullable private synchronized TcpDiscoveryGetClassResponse requestClass(String name) { - TcpDiscoveryGetClassRequest msg = new TcpDiscoveryGetClassRequest(getLocalNodeId(), name); + @Nullable private synchronized TcpDiscoveryClassResponse requestClass(String name) { + TcpDiscoveryClassRequest msg = new TcpDiscoveryClassRequest(getLocalNodeId(), name); for (int i = 0; i < reconCnt; i++) { if (sock == null) { - TcpDiscoveryNode node0 = node; - - if (node0 == null) - return null; // Node left. - - sock = connect(node0); + sock = connect(node); if (sock == null) break; @@ -5549,8 +5504,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } } - node = null; // Consider node failed. - p2pLdrs.remove(nodeId, this); return null; @@ -5563,7 +5516,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @throws IOException If request failed. * @throws IgniteCheckedException If request failed. */ - private TcpDiscoveryGetClassResponse request(Socket sock, TcpDiscoveryGetClassRequest msg) + private TcpDiscoveryClassResponse request(Socket sock, TcpDiscoveryClassRequest msg) throws IOException, IgniteCheckedException { long tstamp = U.currentTimeMillis(); @@ -5572,7 +5525,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); - TcpDiscoveryGetClassResponse res = readMessage(sock, null, netTimeout); + TcpDiscoveryClassResponse res = readMessage(sock, null, netTimeout); stats.onMessageReceived(res); @@ -5630,13 +5583,4 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov return null; } } - - /** - * @param msg Message. - * @param nodeId Node ID. - */ - private static void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) { - msg.removeMetrics(nodeId); - msg.removeCacheMetrics(nodeId); - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java index db75051..98e048d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java @@ -754,11 +754,11 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov Map<Integer, byte[]> data, ClassLoader clsLdr) { - Map<Integer, Object> data0 = U.newHashMap(data.size()); + Map<Integer, Serializable> data0 = U.newHashMap(data.size()); for (Map.Entry<Integer, byte[]> entry : data.entrySet()) { try { - Object compData = marsh.unmarshal(entry.getValue(), clsLdr); + Serializable compData = marsh.unmarshal(entry.getValue(), clsLdr); data0.put(entry.getKey(), compData); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClassRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClassRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClassRequest.java new file mode 100644 index 0000000..befe483 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClassRequest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * + */ +public class TcpDiscoveryClassRequest extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private String clsName; + + /** + * @param creatorNodeId Creator node ID. + * @param clsName Class name. + */ + public TcpDiscoveryClassRequest(UUID creatorNodeId, String clsName) { + super(creatorNodeId); + + this.clsName = clsName; + } + + /** + * @return Class name. + */ + public String className() { + return clsName; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryClassRequest.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClassResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClassResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClassResponse.java new file mode 100644 index 0000000..3bfe61d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClassResponse.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * + */ +public class TcpDiscoveryClassResponse extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private String errMsg; + + /** */ + private byte[] clsBytes; + + /** + * @param creatorNodeId Creator node ID. + * @param clsBytes Class bytes. + */ + public TcpDiscoveryClassResponse(UUID creatorNodeId, byte[] clsBytes) { + super(creatorNodeId); + + this.clsBytes = clsBytes; + } + + /** + * @param creatorNodeId Creator node ID. + * @param errMsg Error message. + */ + public TcpDiscoveryClassResponse(UUID creatorNodeId, String errMsg) { + super(creatorNodeId); + + this.errMsg = errMsg; + } + + /** + * @return Error if class loading failed. + */ + @Nullable public String error() { + return errMsg; + } + + /** + * @return Loaded class bytes. + */ + public byte[] classBytes() { + return clsBytes; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryClassResponse.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index 6f78953..3144b93 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -17,6 +17,8 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import org.apache.ignite.internal.util.typedef.internal.*; + import java.util.*; /** @@ -46,4 +48,9 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage public byte[] messageBytes() { return msgBytes; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super", super.toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryGetClassRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryGetClassRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryGetClassRequest.java deleted file mode 100644 index 0f6a0c4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryGetClassRequest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp.messages; - -import java.util.*; - -/** - * - */ -public class TcpDiscoveryGetClassRequest extends TcpDiscoveryAbstractMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private String clsName; - - /** - * @param creatorNodeId Creator node ID. - * @param clsName Class name. - */ - public TcpDiscoveryGetClassRequest(UUID creatorNodeId, String clsName) { - super(creatorNodeId); - - this.clsName = clsName; - } - - /** - * @return Class name. - */ - public String className() { - return clsName; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryGetClassResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryGetClassResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryGetClassResponse.java deleted file mode 100644 index d377aee..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryGetClassResponse.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp.messages; - -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * - */ -public class TcpDiscoveryGetClassResponse extends TcpDiscoveryAbstractMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private String errMsg; - - /** */ - private byte[] clsBytes; - - /** - * @param creatorNodeId Creator node ID. - * @param clsBytes Class bytes. - */ - public TcpDiscoveryGetClassResponse(UUID creatorNodeId, byte[] clsBytes) { - super(creatorNodeId); - - this.clsBytes = clsBytes; - } - - /** - * @param creatorNodeId Creator node ID. - * @param errMsg Error message. - */ - public TcpDiscoveryGetClassResponse(UUID creatorNodeId, String errMsg) { - super(creatorNodeId); - - this.errMsg = errMsg; - } - - /** - * @return Error if class loading failed. - */ - @Nullable public String error() { - return errMsg; - } - - /** - * @return Loaded class bytes. - */ - public byte[] classBytes() { - return clsBytes; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java index 234e3cd..3c61f00 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java @@ -379,11 +379,11 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri }); spi.setDataExchange(new DiscoverySpiDataExchange() { - @Override public Map<Integer, Object> collect(UUID nodeId) { + @Override public Map<Integer, Serializable> collect(UUID nodeId) { return new HashMap<>(); } - @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Object> data) { + @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data) { // No-op. } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java index b0e22b4..1ce0bcd 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java @@ -23,6 +23,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.junits.spi.*; +import java.io.*; import java.util.*; /** @@ -44,11 +45,11 @@ public class TcpDiscoverySpiStartStopSelfTest extends GridSpiStartStopAbstractTe @GridSpiTestConfig public DiscoverySpiDataExchange getDataExchange() { return new DiscoverySpiDataExchange() { - @Override public Map<Integer, Object> collect(UUID nodeId) { + @Override public Map<Integer, Serializable> collect(UUID nodeId) { return null; } - @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Object> data) { + @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data) { // No-op. } }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java index bc6b4cb..de8596d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java @@ -208,11 +208,11 @@ public abstract class GridSpiAbstractTest<T extends IgniteSpi> extends GridAbstr discoSpi.setMetricsProvider(createMetricsProvider()); discoSpi.setDataExchange(new DiscoverySpiDataExchange() { - @Override public Map<Integer, Object> collect(UUID nodeId) { + @Override public Map<Integer, Serializable> collect(UUID nodeId) { return new HashMap<>(); } - @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Object> data) { + @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data) { } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d25627c9/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/startcache/CacheConfigP2PStartClient.java ---------------------------------------------------------------------- diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/startcache/CacheConfigP2PStartClient.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/startcache/CacheConfigP2PStartClient.java index b5e0989..1170175 100644 --- a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/startcache/CacheConfigP2PStartClient.java +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/startcache/CacheConfigP2PStartClient.java @@ -35,25 +35,41 @@ public class CacheConfigP2PStartClient { public static void main(String[] args) throws Exception { IgniteConfiguration cfg = new IgniteConfiguration(); - Ignite ignite = Ignition.start(cfg); + try (Ignite ignite = Ignition.start(cfg)) { + int nodes = ignite.cluster().nodes().size(); - int nodes = ignite.cluster().nodes().size(); + if (nodes != 3) + throw new Exception("Unexpected nodes number: " + nodes); - if (nodes != 3) - throw new Exception("Unexpected nodes number: " + nodes); + CacheConfiguration<Integer, Organization1> ccfg1 = new CacheConfiguration<>(); - CacheConfiguration<Integer, Organization1> ccfg1 = new CacheConfiguration<>(); + ccfg1.setName("cache1"); - ccfg1.setNodeFilter(new CacheAllNodesFilter()); + ccfg1.setNodeFilter(new CacheAllNodesFilter()); - ccfg1.setIndexedTypes(Integer.class, Organization1.class); + ccfg1.setIndexedTypes(Integer.class, Organization1.class); - IgniteCache<Integer, Organization1> cache1 = ignite.createCache(ccfg1); + System.out.println("Create cache1."); - for (int i = 0; i < 500; i++) - cache1.put(i, new Organization1("org-" + i)); + IgniteCache<Integer, Organization1> cache1 = ignite.createCache(ccfg1); - Thread.sleep(5000); + for (int i = 0; i < 500; i++) + cache1.put(i, new Organization1("org-" + i)); + + System.out.println("Sleep some time."); + + Thread.sleep(5000); // Sleep some time to wait when connection of p2p loader is closed. + + System.out.println("Create cache2."); + + CacheConfiguration<Integer, Organization2> ccfg2 = new CacheConfiguration<>(); + + ccfg2.setName("cache2"); + + ccfg2.setIndexedTypes(Integer.class, Organization1.class); + + IgniteCache<Integer, Organization2> cache2 = ignite.createCache(ccfg2); + } } /**