# ignite-537

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/88e1ff88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/88e1ff88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/88e1ff88

Branch: refs/heads/ignite-721
Commit: 88e1ff8810ebc617e08c8eea554ec451f2df603c
Parents: 5029aeb
Author: sboikov <semen.boi...@inria.fr>
Authored: Thu Apr 9 23:49:42 2015 +0300
Committer: sboikov <semen.boi...@inria.fr>
Committed: Thu Apr 9 23:49:42 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  41 +---
 .../continuous/GridContinuousProcessor.java     |  25 ++-
 .../spi/discovery/DiscoverySpiDataExchange.java |   6 +-
 .../discovery/tcp/TcpClientDiscoverySpi.java    |  11 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 223 +++++++++++++++++--
 .../discovery/tcp/TcpDiscoverySpiAdapter.java   |  13 --
 .../continuous/GridEventConsumeSelfTest.java    |   5 +-
 .../discovery/AbstractDiscoverySelfTest.java    |   5 +-
 .../tcp/TcpDiscoverySpiStartStopSelfTest.java   |   5 +-
 .../junits/spi/GridSpiAbstractTest.java         |   5 +-
 10 files changed, 253 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88e1ff88/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 7662bcc..04ff423 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -36,7 +36,6 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.jdk.*;
 import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.plugin.segmentation.*;
 import org.apache.ignite.spi.*;
@@ -172,9 +171,6 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     /** Map of dynamic cache filters. */
     private Map<String, CachePredicate> registeredCaches = new HashMap<>();
 
-    /** */
-    private JdkMarshaller jdkMarsh = new JdkMarshaller();
-
     /** @param ctx Context. */
     public GridDiscoveryManager(GridKernalContext ctx) {
         super(ctx, ctx.config().getDiscoverySpi());
@@ -421,10 +417,10 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         });
 
         spi.setDataExchange(new DiscoverySpiDataExchange() {
-            @Override public Map<Integer, byte[]> collect(UUID nodeId) {
+            @Override public Map<Integer, Object> collect(UUID nodeId) {
                 assert nodeId != null;
 
-                Map<Integer, byte[]> data = new HashMap<>();
+                Map<Integer, Object> data = new HashMap<>();
 
                 for (GridComponent comp : ctx.components()) {
                     Object compData = comp.collectDiscoveryData(nodeId);
@@ -432,48 +428,29 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     if (compData != null) {
                         assert comp.discoveryDataType() != null;
 
-                        try {
-                            byte[] bytes = jdkMarsh.marshal(compData);
-
-                            data.put(comp.discoveryDataType().ordinal(), 
bytes);
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to marshal discovery data " +
-                                "[comp=" + comp + ", data=" + compData + ']', 
e);
-                        }
+                        data.put(comp.discoveryDataType().ordinal(), compData);
                     }
                 }
 
                 return data;
             }
 
-            @Override public void onExchange(UUID joiningNodeId,
-                UUID nodeId,
-                Map<Integer, byte[]> data,
-                ClassLoader clsLdr) {
-                for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
+            @Override public void onExchange(UUID joiningNodeId, UUID nodeId, 
Map<Integer, Object> data) {
+                for (Map.Entry<Integer, Object> e : data.entrySet()) {
                     GridComponent comp = null;
 
                     for (GridComponent c : ctx.components()) {
-                        if (c.discoveryDataType() != null && 
c.discoveryDataType().ordinal() == entry.getKey()) {
+                        if (c.discoveryDataType() != null && 
c.discoveryDataType().ordinal() == e.getKey()) {
                             comp = c;
 
                             break;
                         }
                     }
 
-                    if (comp != null) {
-                        try {
-                            Object compData = 
jdkMarsh.unmarshal(entry.getValue(), clsLdr);
-
-                            comp.onDiscoveryDataReceived(joiningNodeId, 
nodeId, compData);
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to unmarshal discovery data 
for component: "  + comp, e);
-                        }
-                    }
+                    if (comp != null)
+                        comp.onDiscoveryDataReceived(joiningNodeId, nodeId, 
e.getValue());
                     else
-                        U.warn(log, "Received discovery data for unknown 
component: " + entry.getKey());
+                        U.warn(log, "Received discovery data for unknown 
component: " + e.getKey());
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88e1ff88/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 54da8e6..99b9c0c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -1559,7 +1560,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     /**
      * Discovery data.
      */
-    private static class DiscoveryData implements Externalizable {
+    private static class DiscoveryData implements Externalizable, 
GridPeerDeployAware {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1577,6 +1578,16 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             // No-op.
         }
 
+        @Override
+        public Class<?> deployClass() {
+            return U.peerDeployAware0(items).deployClass();
+        }
+
+        @Override
+        public ClassLoader classLoader() {
+            return U.peerDeployAware0(items).classLoader();
+        }
+
         /**
          * @param nodeId Node ID.
          */
@@ -1616,7 +1627,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     /**
      * Discovery data item.
      */
-    private static class DiscoveryDataItem implements Externalizable {
+    private static class DiscoveryDataItem implements Externalizable, 
GridPeerDeployAware {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1666,6 +1677,16 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             this.interval = interval;
         }
 
+        @Override
+        public Class<?> deployClass() {
+            return prjPred.getClass();
+        }
+
+        @Override
+        public ClassLoader classLoader() {
+            return prjPred.getClass().getClassLoader();
+        }
+
         /** {@inheritDoc} */
         @Override public void writeExternal(ObjectOutput out) throws 
IOException {
             U.writeUuid(out, routineId);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88e1ff88/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
index 06f1af8..1f64c87 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
@@ -33,7 +33,7 @@ public interface DiscoverySpiDataExchange {
      * @param joiningNodeId ID of new node that joins topology.
      * @return Collection of discovery data objects from different components.
      */
-    public Map<Integer, byte[]> collect(UUID joiningNodeId);
+    public Map<Integer, Object> collect(UUID joiningNodeId);
 
     /**
      * Notifies discovery manager about data received from remote node.
@@ -41,10 +41,8 @@ public interface DiscoverySpiDataExchange {
      * @param joiningNodeId Joining node ID.
      * @param nodeId Remote node ID for which data is provided.
      * @param data Collection of marshalled discovery data objects from 
different components.
-     * @param clsLdr Class loader to use for discovery data unmarshalling.
      */
     public void onExchange(UUID joiningNodeId,
         UUID nodeId,
-        Map<Integer, byte[]> data,
-        ClassLoader clsLdr);
+        Map<Integer, Object> data);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88e1ff88/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 6ced987..f613f4a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -897,10 +897,9 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
 
                         if (dataMap != null) {
                             for (Map.Entry<UUID, Map<Integer, byte[]>> entry : 
dataMap.entrySet()) {
-                                exchange.onExchange(newNodeId,
-                                    entry.getKey(),
-                                    entry.getValue(),
-                                    exchangeClassLoader(newNodeId));
+//                                exchange.onExchange(newNodeId,
+//                                    entry.getKey(),
+//                                    entry.getValue());
                             }
                         }
 
@@ -923,8 +922,8 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
 
                     Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
 
-                    if (data != null)
-                        exchange.onExchange(newNodeId, newNodeId, data, 
exchangeClassLoader(newNodeId));
+//                    if (data != null)
+//                        exchange.onExchange(newNodeId, newNodeId, data);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88e1ff88/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index ef22291..3fa6fd1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1377,6 +1377,87 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             log.debug("Discovery SPI has been connected to topology with 
order: " + locNode.internalOrder());
     }
 
+    /** */
+    private GridConcurrentHashSet<ClassLoader> exchangeLdrs = new 
GridConcurrentHashSet();
+
+    /**
+     * @param nodeId Node ID>
+     * @return Marshalled exchange data.
+     * @throws IgniteSpiException If failed.
+     */
+    protected Map<Integer, byte[]> collectExchangeData(UUID nodeId) throws 
IgniteSpiException {
+        boolean p2pEnaled = ignite.configuration().isPeerClassLoadingEnabled();
+
+        Map<Integer, Object> data = exchange.collect(nodeId);
+
+        Map<Integer, byte[]> data0 = U.newHashMap(data.size());
+
+        for (Map.Entry<Integer, Object> entry : data.entrySet()) {
+            try {
+                byte[] bytes = marsh.marshal(entry.getValue());
+
+                data0.put(entry.getKey(), bytes);
+
+                if (p2pEnaled) {
+                    ClassLoader ldr;
+
+                    if (entry.getValue() instanceof GridPeerDeployAware)
+                        ldr = 
((GridPeerDeployAware)entry.getValue()).classLoader();
+                    else
+                        ldr = entry.getValue().getClass().getClassLoader();
+
+                    log.info("Add loader: " + entry.getValue() + " " + ldr);
+
+                    if (ldr != null) {
+                        if (exchangeLdrs.add(ldr))
+                            log.info("Added loader: " + ldr);
+                    }
+                }
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to marshal discovery data " +
+                    "[comp=" + entry.getKey() + ", data=" + entry.getValue() + 
']', e);
+
+                throw new IgniteSpiException("Failed to marshal discovery 
data.", e);
+            }
+        }
+
+        return data0;
+    }
+
+    /**
+     * @param joiningNode Joining node.
+     * @param nodeId Remote node ID for which data is provided.
+     * @param data Collection of marshalled discovery data objects from 
different components.
+     */
+    protected void onExchange(TcpDiscoveryNode joiningNode,
+        UUID nodeId,
+        Map<Integer, byte[]> data) {
+        TcpDiscoveryNode node;
+
+        if (joiningNode.id().equals(nodeId))
+            node = joiningNode;
+        else
+            node = ring.node(nodeId);
+
+        ClassLoader clsLdr = exchangeClassLoader(node);
+
+        Map<Integer, Object> data0 = U.newHashMap(data.size());
+
+        for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
+            try {
+                Object compData = marsh.unmarshal(entry.getValue(), clsLdr);
+
+                data0.put(entry.getKey(), compData);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to unmarshal discovery data for 
component: "  + entry.getKey(), e);
+            }
+        }
+
+        exchange.onExchange(joiningNode.id(), nodeId, data0);
+    }
+
     /**
      * Tries to send join request message to a random node presenting in 
topology.
      * Address is provided by {@link TcpDiscoveryIpFinder} and message is
@@ -1388,7 +1469,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
     @SuppressWarnings({"BusyWait"})
     private boolean sendJoinRequestMessage() throws IgniteSpiException {
         TcpDiscoveryAbstractMessage joinReq = new 
TcpDiscoveryJoinRequestMessage(locNode,
-            exchange.collect(getLocalNodeId()));
+            collectExchangeData(getLocalNodeId()));
 
         // Time when it has been detected, that addresses from IP finder do 
not respond.
         long noResStart = 0;
@@ -2198,7 +2279,11 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
     private TcpDiscoveryGetClassResponse 
processGetClassRequest(TcpDiscoveryGetClassRequest req) {
         assert !F.isEmpty(req.className()) : req;
 
-        InputStream in = getClass().getResourceAsStream(req.className());
+        String rsrc = U.classNameToResourceName(req.className());
+
+        log.info("Get class request: " + req.className() + " " + rsrc);
+
+        InputStream in = locLdr.getResourceAsStream(rsrc);
 
         byte[] clsBytes = null;
         String err = null;
@@ -3591,9 +3676,9 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                     Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
 
                     if (data != null)
-                        exchange.onExchange(node.id(), node.id(), data, 
exchangeClassLoader(node.id()));
+                        onExchange(node, node.id(), data);
 
-                    msg.addDiscoveryData(locNodeId, 
exchange.collect(node.id()));
+                    msg.addDiscoveryData(locNodeId, 
collectExchangeData(node.id()));
                 }
 
                 if (log.isDebugEnabled())
@@ -3662,12 +3747,8 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
 
                 // Notify outside of synchronized block.
                 if (dataMap != null) {
-                    for (Map.Entry<UUID, Map<Integer, byte[]>> entry : 
dataMap.entrySet()) {
-                        exchange.onExchange(node.id(),
-                            entry.getKey(),
-                            entry.getValue(),
-                            exchangeClassLoader(node.id()));
-                    }
+                    for (Map.Entry<UUID, Map<Integer, byte[]>> entry : 
dataMap.entrySet())
+                        onExchange(node, entry.getKey(), entry.getValue());
                 }
             }
 
@@ -5221,30 +5302,109 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             U.closeQuiet(sock);
         }
     }
+
+    /** */
+    private ConcurrentMap<UUID, DiscoveryDeploymentClassLoader> ldrs = new 
ConcurrentHashMap<>();
+
+    /** */
+    private LocalDeploymentClassLoader locLdr = new 
LocalDeploymentClassLoader(null);
+
+    /**
+     *
+     */
+    class LocalDeploymentClassLoader extends ClassLoader {
+        public LocalDeploymentClassLoader(ClassLoader parent) {
+            super(parent);
+        }
+
+        @Override
+        public InputStream getResourceAsStream(String name) {
+            log.info("Local getResourceAsStream: " + name);
+
+            for (ClassLoader ldr : exchangeLdrs) {
+                InputStream  in = ldr.getResourceAsStream(name);
+
+                if (in != null)
+                    return in;
+            }
+
+            return super.getResourceAsStream(name);
+        }
+
+        @Override public Class<?> loadClass(String name) throws 
ClassNotFoundException {
+            log.info("Local loadClass: " + name);
+
+            for (ClassLoader ldr : exchangeLdrs) {
+                try {
+                    return ldr.loadClass(name);
+                }
+                catch (ClassNotFoundException ignore) {
+                }
+            }
+
+            return super.loadClass(name);
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @return Class loader for exchange data unmarshalling.
+     */
+    protected ClassLoader exchangeClassLoader(TcpDiscoveryNode node) {
+        assert ignite != null;
+
+        UUID nodeId = node.id();
+
+        if (!ignite.configuration().isPeerClassLoadingEnabled())
+            return null;
+
+        if (nodeId.equals(getLocalNodeId()))
+            return locLdr;
+
+        DiscoveryDeploymentClassLoader ldr = ldrs.get(nodeId);
+
+        if (ldr == null)
+            ldr = F.addIfAbsent(ldrs, nodeId, new 
DiscoveryDeploymentClassLoader(node));
+
+        return ldr;
+    }
+
     /**
      *
      */
     private class DiscoveryDeploymentClassLoader extends ClassLoader {
         /** */
-        private final UUID nodeId;
+        private final TcpDiscoveryNode node;
 
         /** */
         private Socket sock;
 
         /**
-         * @param nodeId Node ID.
+         * @param node Node.
          */
-        public DiscoveryDeploymentClassLoader(UUID nodeId) {
-            this.nodeId = nodeId;
+        public DiscoveryDeploymentClassLoader(TcpDiscoveryNode node) {
+            assert !node.id().equals(getLocalNodeId());
+
+            this.node = node;
+        }
+
+        @Override protected Class<?> loadClass(String name, boolean resolve) 
throws ClassNotFoundException {
+            // log.info("P2p load class: " + name);
+
+            return super.loadClass(name, resolve);
         }
 
         /** {@inheritDoc} */
         @Override protected Class<?> findClass(String name) throws 
ClassNotFoundException {
+            log.info("P2p find class: " + name);
+
             TcpDiscoveryGetClassResponse res = requestClass(name);
 
             if (res.error() != null)
                 throw new ClassNotFoundException(res.error());
 
+            log.info("P2p loaded: " + name);
+
             assert res.classBytes() != null;
 
             Class<?> cls = defineClass(name, res.classBytes(), 0, 
res.classBytes().length);
@@ -5272,16 +5432,45 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                 return res;
             }
             catch (IOException | IgniteCheckedException e) {
+                e.printStackTrace();
+
                 throw new ClassNotFoundException("Failed to load class: " + 
name, e);
             }
         }
 
         private Socket connect() {
             if (sock == null) {
-                TcpDiscoveryNode node = ring.node(nodeId);
+                for (InetSocketAddress addr : getNodeAddresses(node, 
U.sameMacs(locNode, node))) {
+                    sock = connect(addr);
+
+                    if (sock != null)
+                        break;
+                }
+            }
+
+            return sock;
+        }
+
+        private Socket connect(InetSocketAddress addr) {
+            UUID locNodeId  = getLocalNodeId();
+
+            Socket sock = null;
+
+            for (int i = 0; i < reconCnt; i++) {
+                try {
+                    sock = openSocket(addr);
 
-                if (node == null)
-                    return null;
+                    writeToSocket(sock, new 
TcpDiscoveryHandshakeRequest(locNodeId));
+
+                    TcpDiscoveryHandshakeResponse res = readMessage(sock, 
null, netTimeout);
+
+                    break;
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    e.printStackTrace();
+
+                    U.closeQuiet(sock);
+                }
             }
 
             return sock;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88e1ff88/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index 5712d25..9cde198 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -725,19 +725,6 @@ abstract class TcpDiscoverySpiAdapter extends 
IgniteSpiAdapter implements Discov
     }
 
     /**
-     * @param nodeId Node ID.
-     * @return Class loader for exchange data unmarshalling.
-     */
-    protected ClassLoader exchangeClassLoader(UUID nodeId) {
-        assert ignite != null;
-
-        if (!ignite.configuration().isPeerClassLoadingEnabled() || 
nodeId.equals(getLocalNodeId()))
-            return null;
-
-        return null;
-    }
-
-    /**
      * @param msg Message.
      * @return Whether delivery of the message is ensured.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88e1ff88/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
index a51d1a8..4bd2901 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -763,7 +763,7 @@ public class GridEventConsumeSelfTest extends 
GridCommonAbstractTest implements
      * @throws Exception If failed.
      */
     // TODO: GG-6730
-    public void _testNodeJoinWithP2P() throws Exception {
+    public void testNodeJoinWithP2P() throws Exception {
         final Collection<UUID> nodeIds = new HashSet<>();
         final AtomicInteger cnt = new AtomicInteger();
         final CountDownLatch latch = new CountDownLatch(GRID_CNT + 1);
@@ -800,8 +800,7 @@ public class GridEventConsumeSelfTest extends 
GridCommonAbstractTest implements
             assertEquals(GRID_CNT + 1, cnt.get());
         }
         finally {
-            stopGrid("anotherGrid1");
-            stopGrid("anotherGrid2");
+            stopGrid("anotherGrid");
 
             grid(0).events().stopRemoteListen(consumeId);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88e1ff88/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
index 78710fe..91d3514 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
@@ -379,14 +379,13 @@ public abstract class AbstractDiscoverySelfTest<T extends 
IgniteSpi> extends Gri
                 });
 
                 spi.setDataExchange(new DiscoverySpiDataExchange() {
-                    @Override public Map<Integer, byte[]> collect(UUID nodeId) 
{
+                    @Override public Map<Integer, Object> collect(UUID nodeId) 
{
                         return new HashMap<>();
                     }
 
                     @Override public void onExchange(UUID joiningNodeId,
                         UUID nodeId,
-                        Map<Integer, byte[]> data,
-                        ClassLoader clsLdr) {
+                        Map<Integer, Object> data) {
                         // No-op.
                     }
                 });

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88e1ff88/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java
index 6692046..058d908 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java
@@ -44,14 +44,13 @@ public class TcpDiscoverySpiStartStopSelfTest extends 
GridSpiStartStopAbstractTe
     @GridSpiTestConfig
     public DiscoverySpiDataExchange getDataExchange() {
         return new DiscoverySpiDataExchange() {
-            @Override public Map<Integer, byte[]> collect(UUID nodeId) {
+            @Override public Map<Integer, Object> collect(UUID nodeId) {
                 return null;
             }
 
             @Override public void onExchange(UUID joiningNodeId,
                 UUID nodeId,
-                Map<Integer, byte[]> data,
-                ClassLoader clsLdr) {
+                Map<Integer, Object> data) {
                 // No-op.
             }
         };

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88e1ff88/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
index 7b83979..82f3d6c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
@@ -215,14 +215,13 @@ public abstract class GridSpiAbstractTest<T extends 
IgniteSpi> extends GridAbstr
             discoSpi.setMetricsProvider(createMetricsProvider());
 
             discoSpi.setDataExchange(new DiscoverySpiDataExchange() {
-                @Override public Map<Integer, byte[]> collect(UUID nodeId) {
+                @Override public Map<Integer, Object> collect(UUID nodeId) {
                     return new HashMap<>();
                 }
 
                 @Override public void onExchange(UUID joiningNodeId,
                     UUID nodeId,
-                    Map<Integer, byte[]> data,
-                    ClassLoader clsLdr) {
+                    Map<Integer, Object> data) {
                 }
             });
 

Reply via email to