http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java
new file mode 100644
index 0000000..82e29db
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.closure;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.gridgain.testframework.*;
+import org.gridgain.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Tests for {@link GridClosureProcessor}.
+ */
+@GridCommonTest(group = "Closure Processor")
+public class GridClosureProcessorSelfTest extends GridCommonAbstractTest {
+    /** Number of grids started for tests. Should not be less than 2. */
+    private static final int NODES_CNT = 2;
+
+    /** Job sleep duration in order to initiate timeout exception. */
+    private static final long JOB_SLEEP = 200;
+
+    /** Timeout used in timed tests. */
+    private static final long JOB_TIMEOUT = 100;
+
+    /** IP finder. */
+    private final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        cfg.setCacheConfiguration();
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ConstantConditions"})
+    @Override protected void beforeTestsStarted() throws Exception {
+        assert NODES_CNT >= 2;
+
+        startGrids(NODES_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        execCntr.set(0);
+    }
+
+    /** Execution counter for runnable and callable jobs. */
+    private static AtomicInteger execCntr = new AtomicInteger(0);
+
+    /**
+     * Test runnable job.
+     */
+    private static class TestRunnable implements IgniteRunnable {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** @{inheritDoc} */
+        @Override public void run() {
+            log.info("Runnable job executed on node: " + 
ignite.cluster().localNode().id());
+
+            assert ignite != null;
+
+            execCntr.incrementAndGet();
+        }
+    }
+
+    /**
+     * Base class for test callables.
+     */
+    private abstract static class AbstractTestCallable implements 
IgniteCallable<Integer> {
+        /** */
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /** */
+        @IgniteLoggerResource
+        protected IgniteLogger log;
+    }
+
+    /**
+     * Test callable job.
+     */
+    private static class TestCallable extends AbstractTestCallable {
+        /** {@inheritDoc} */
+        @Override public Integer call() {
+            log.info("Callable job executed on node: " + 
ignite.cluster().localNode().id());
+
+            assert ignite != null;
+
+            return execCntr.incrementAndGet();
+        }
+    }
+
+    /**
+     * Test callable job which throws class not found exception.
+     */
+    private static class TestCallableError extends AbstractTestCallable 
implements Externalizable {
+        /**
+         *
+         */
+        public TestCallableError() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer call() {
+            log.info("Callable job executed on node: " + 
ignite.cluster().localNode().id());
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            throw new ClassNotFoundException();
+        }
+    }
+
+    /**
+     * Test callable job which sleeps for some time. Is used in timeout tests.
+     */
+    private static class TestCallableTimeout extends AbstractTestCallable {
+        /** {@inheritDoc} */
+        @Override public Integer call() throws Exception {
+            Thread.sleep(JOB_SLEEP);
+
+            return null;
+        }
+    }
+
+    /**
+     * @param idx Node index.
+     * @param job Runnable job.
+     * @param p Optional node predicate.
+     * @return Future object.
+     * @throws IgniteCheckedException If failed.
+     */
+    private IgniteFuture<?> runAsync(int idx, Runnable job, @Nullable 
IgnitePredicate<ClusterNode> p)
+        throws IgniteCheckedException {
+        assert idx >= 0 && idx < NODES_CNT;
+        assert job != null;
+
+        execCntr.set(0);
+
+        IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : 
grid(idx).compute();
+
+        comp = comp.enableAsync();
+
+        comp.run(job);
+
+        return comp.future();
+    }
+
+    /**
+     * @param idx Node index.
+     * @param job Runnable job.
+     * @param p Optional node predicate.
+     * @return Future object.
+     * @throws IgniteCheckedException If failed.
+     */
+    private IgniteFuture<?> broadcast(int idx, Runnable job, @Nullable 
IgnitePredicate<ClusterNode> p)
+        throws IgniteCheckedException {
+        assert idx >= 0 && idx < NODES_CNT;
+        assert job != null;
+
+        execCntr.set(0);
+
+        ClusterGroup prj = grid(idx);
+
+        if (p != null)
+            prj = prj.forPredicate(p);
+
+        IgniteCompute comp = compute(prj).enableAsync();
+
+        comp.broadcast(job);
+
+        return comp.future();
+    }
+
+    /**
+     * @param idx Node index.
+     * @param jobs Runnable jobs.
+     * @param p Optional node predicate.
+     * @return Future object.
+     * @throws IgniteCheckedException If failed.
+     */
+    private IgniteFuture<?> runAsync(int idx, Collection<TestRunnable> jobs, 
@Nullable IgnitePredicate<ClusterNode> p)
+        throws IgniteCheckedException {
+        assert idx >= 0 && idx < NODES_CNT;
+        assert !F.isEmpty(jobs);
+
+        execCntr.set(0);
+
+        IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : 
grid(idx).compute();
+
+        comp = comp.enableAsync();
+
+        comp.run(jobs);
+
+        return comp.future();
+    }
+
+    /**
+     * @param idx Node index.
+     * @param job Callable job.
+     * @param p Optional node predicate.
+     * @return Future object.
+     * @throws IgniteCheckedException If failed.
+     */
+    private IgniteFuture<Integer> callAsync(int idx, Callable<Integer> job, 
@Nullable IgnitePredicate<ClusterNode> p)
+        throws IgniteCheckedException {
+        assert idx >= 0 && idx < NODES_CNT;
+        assert job != null;
+
+        execCntr.set(0);
+
+        IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : 
grid(idx).compute();
+
+        comp = comp.enableAsync();
+
+        comp.call(job);
+
+        return comp.future();
+    }
+
+    /**
+     * @param idx Node index.
+     * @param job Callable job.
+     * @param p Optional node predicate.
+     * @return Future object.
+     * @throws IgniteCheckedException If failed.
+     */
+    private IgniteFuture<Collection<Integer>> broadcast(int idx, 
Callable<Integer> job,
+        @Nullable IgnitePredicate<ClusterNode> p) throws 
IgniteCheckedException {
+        assert idx >= 0 && idx < NODES_CNT;
+        assert job != null;
+
+        execCntr.set(0);
+
+        IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : 
grid(idx).compute();
+
+        comp = comp.enableAsync();
+
+        comp.broadcast(job);
+
+        return comp.future();
+    }
+
+    /**
+     * @param idx Node index.
+     * @param jobs Callable job.
+     * @param p Optional node predicate.
+     * @return Future object.
+     * @throws IgniteCheckedException If failed.
+     */
+    private IgniteFuture<Collection<Integer>> callAsync(int idx, 
Collection<TestCallable> jobs,
+        @Nullable IgnitePredicate<ClusterNode> p) throws 
IgniteCheckedException {
+        assert idx >= 0 && idx < NODES_CNT;
+        assert !F.isEmpty(jobs);
+
+        execCntr.set(0);
+
+        IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : 
grid(idx).compute();
+
+        comp = comp.enableAsync();
+
+        comp.call(jobs);
+
+        return comp.future();
+    }
+
+    /**
+     * @param idx Node index.
+     * @return Predicate.
+     */
+    private IgnitePredicate<ClusterNode> singleNodePredicate(final int idx) {
+        assert idx >= 0 && idx < NODES_CNT;
+
+        return new IgnitePredicate<ClusterNode>() {
+            @Override public boolean apply(ClusterNode e) { return 
grid(idx).localNode().id().equals(e.id()); }
+        };
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRunAsyncSingle() throws Exception {
+        Runnable job = new TestRunnable();
+
+        IgniteFuture<?> fut = broadcast(0, job, null);
+
+        assert fut.get() == null;
+
+        assertEquals(NODES_CNT, execCntr.getAndSet(0));
+
+        fut = broadcast(0, job, singleNodePredicate(0));
+
+        assert fut.get() == null;
+
+        assertEquals(1, execCntr.get());
+
+        fut = runAsync(0, job, null);
+
+        assert fut.get() == null : "Execution result must be null.";
+
+        assert execCntr.get() == 1 :
+            "Execution counter must be equal to 1, actual: " + execCntr.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRunAsyncMultiple() throws Exception {
+        Collection<TestRunnable> jobs = F.asList(new TestRunnable(), new 
TestRunnable());
+
+        IgniteFuture<?> fut = runAsync(0, jobs, null);
+
+        assert fut.get() == null : "Execution result must be null.";
+
+        assert execCntr.get() == jobs.size() :
+            "Execution counter must be equal to " + jobs.size() + ", actual: " 
+ execCntr.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCallAsyncSingle() throws Exception {
+        Callable<Integer> job = new TestCallable();
+
+        IgniteFuture<Collection<Integer>> fut1 = broadcast(0, job, null);
+
+        assert fut1.get() != null;
+
+        assertEquals(NODES_CNT, execCntr.getAndSet(0));
+
+        fut1 = broadcast(0, job, singleNodePredicate(0));
+
+        // We left one node so we can get definite result.
+        assertEquals(Integer.valueOf(1), F.first(fut1.get()));
+
+        assertEquals(1, execCntr.get());
+
+        IgniteFuture<Integer> fut2 = callAsync(0, job, null);
+
+        assert fut2.get() == 1 :
+            "Execution result must be equal to 1, actual: " + fut2.get();
+
+        assert execCntr.get() == 1 :
+            "Execution counter must be equal to 1, actual: " + execCntr.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCallAsyncErrorNoFailover() throws Exception {
+        IgniteCompute comp = 
compute(grid(0).forPredicate(F.notEqualTo(grid(0).localNode()))).enableAsync();
+
+        comp.withNoFailover().call(new TestCallableError());
+
+        IgniteFuture<Integer> fut = comp.future();
+
+        try {
+            fut.get();
+
+            assert false : "Exception should have been thrown.";
+        }
+        catch (IgniteCheckedException e) {
+            info("Caught expected exception: " + e);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWithName() throws Exception {
+        grid(0).compute().withName("TestTaskName").call(new TestCallable());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWithTimeout() throws Exception {
+        Collection<TestCallableTimeout> jobs = F.asList(new 
TestCallableTimeout());
+
+        boolean timedOut = false;
+
+        try {
+            // Ensure that we will get timeout exception.
+            grid(0).compute().withTimeout(JOB_TIMEOUT).call(jobs);
+        }
+        catch (ComputeTaskTimeoutException ignore) {
+            timedOut = true;
+        }
+
+        assert timedOut : "Task has not timed out.";
+
+        timedOut = false;
+
+        try {
+            // Previous task invocation cleared the timeout.
+            grid(0).compute().call(jobs);
+        }
+        catch (ComputeTaskTimeoutException ignore) {
+            timedOut = true;
+        }
+
+        assert !timedOut : "Subsequently called task has timed out.";
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCallAsyncMultiple() throws Exception {
+        Collection<TestCallable> jobs = F.asList(new TestCallable(), new 
TestCallable());
+
+        IgniteFuture<Collection<Integer>> fut = callAsync(0, jobs, null);
+
+        Collection<Integer> results = fut.get();
+
+        assert !results.isEmpty() : "Collection of results is empty.";
+
+        assert results.size() == jobs.size() :
+            "Collection of results must be of size: " + jobs.size() + ".";
+
+        for (int i = 1; i <= jobs.size(); i++)
+            assert results.contains(i) : "Collection of results does not 
contain value: " + i;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReduceAsync() throws Exception {
+        Collection<TestCallable> jobs = F.asList(new TestCallable(), new 
TestCallable());
+
+        IgniteCompute comp = grid(0).compute().enableAsync();
+
+        comp.call(jobs, F.sumIntReducer());
+
+        IgniteFuture<Integer> fut = comp.future();
+
+        // Sum of arithmetic progression.
+        int exp = (1 + jobs.size()) * jobs.size() / 2;
+
+        assert fut.get() == exp :
+            "Execution result must be equal to " + exp + ", actual: " + 
fut.get();
+
+        assert execCntr.get() == jobs.size() :
+            "Execution counter must be equal to " + jobs.size() + ", actual: " 
+ execCntr.get();
+
+        execCntr.set(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReducerError() throws Exception {
+        final Ignite g = grid(0);
+
+        final Collection<Callable<Integer>> jobs = new ArrayList<>();
+
+        for (int i = 0; i < g.cluster().nodes().size(); i++) {
+            jobs.add(new IgniteCallable<Integer>() {
+                @Override public Integer call() throws Exception {
+                    throw new RuntimeException("Test exception.");
+                }
+            });
+        }
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                g.compute().call(jobs, new IgniteReducer<Integer, Object>() {
+                    @Override public boolean collect(@Nullable Integer e) {
+                        fail("Expects failed jobs never call 'collect' 
method.");
+
+                        return true;
+                    }
+
+                    @Override public Object reduce() {
+                        return null;
+                    }
+                });
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/package.html
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/package.html
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/package.html
new file mode 100644
index 0000000..1f85ff2
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/package.html
@@ -0,0 +1,23 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 
"http://www.w3.org/TR/html4/loose.dtd";>
+<html>
+<body>
+    <!-- Package description. -->
+    Contains internal tests or test related classes and interfaces.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
new file mode 100644
index 0000000..f016369
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -0,0 +1,1079 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+import static 
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.*;
+
+/**
+ * Event consume test.
+ */
+public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final String PRJ_PRED_CLS_NAME = 
"org.gridgain.grid.tests.p2p.GridEventConsumeProjectionPredicate";
+
+    /** */
+    private static final String FILTER_CLS_NAME = 
"org.gridgain.grid.tests.p2p.GridEventConsumeFilter";
+
+    /** Grids count. */
+    private static final int GRID_CNT = 3;
+
+    /** Number of created consumes per thread in multithreaded test. */
+    private static final int CONSUME_CNT = 500;
+
+    /** Consume latch. */
+    private static volatile CountDownLatch consumeLatch;
+
+    /** Consume counter. */
+    private static volatile AtomicInteger consumeCnt;
+
+    /** Include node flag. */
+    private boolean include;
+
+    /** No automatic unsubscribe flag. */
+    private boolean noAutoUnsubscribe;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (include)
+            cfg.setUserAttributes(F.asMap("include", true));
+
+        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        assertTrue(GRID_CNT > 1);
+
+        include = true;
+
+        startGridsMultiThreaded(GRID_CNT - 1);
+
+        include = false;
+
+        startGrid(GRID_CNT - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        assertEquals(GRID_CNT, grid(0).nodes().size());
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            GridKernal grid = (GridKernal)grid(i);
+
+            GridContinuousProcessor proc = grid.context().continuous();
+
+            if (noAutoUnsubscribe) {
+                localRoutines(proc).clear();
+
+                U.<Map>field(proc, "rmtInfos").clear();
+            }
+
+            assertEquals(0, localRoutines(proc).size());
+            assertEquals(0, U.<Map>field(proc, "rmtInfos").size());
+            assertEquals(0, U.<Map>field(proc, "startFuts").size());
+            assertEquals(0, U.<Map>field(proc, "waitForStartAck").size());
+            assertEquals(0, U.<Map>field(proc, "stopFuts").size());
+            assertEquals(0, U.<Map>field(proc, "waitForStopAck").size());
+            assertEquals(0, U.<Map>field(proc, "pending").size());
+        }
+    }
+
+    /**
+     * @param proc Continuous processor.
+     * @return Local event routines.
+     */
+    private Collection<LocalRoutineInfo> localRoutines(GridContinuousProcessor 
proc) {
+        return F.view(U.<Map<UUID, LocalRoutineInfo>>field(proc, 
"locInfos").values(),
+            new IgnitePredicate<LocalRoutineInfo>() {
+                @Override public boolean apply(LocalRoutineInfo info) {
+                    return info.handler().isForEvents();
+                }
+            });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testApi() throws Exception {
+        try {
+            grid(0).events().stopRemoteListen(null);
+        }
+        catch (NullPointerException ignored) {
+            // No-op.
+        }
+
+        grid(0).events().stopRemoteListen(UUID.randomUUID());
+
+        UUID consumeId = null;
+
+        try {
+            consumeId = grid(0).events().remoteListen(
+                new P2<UUID, IgniteDiscoveryEvent>() {
+                    @Override public boolean apply(UUID uuid, 
IgniteDiscoveryEvent evt) {
+                        return false;
+                    }
+                },
+                new P1<IgniteDiscoveryEvent>() {
+                    @Override public boolean apply(IgniteDiscoveryEvent e) {
+                        return false;
+                    }
+                },
+                EVTS_DISCOVERY
+            );
+
+            assertNotNull(consumeId);
+        }
+        finally {
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+
+        try {
+            consumeId = grid(0).events().remoteListen(
+                new P2<UUID, IgniteDiscoveryEvent>() {
+                    @Override public boolean apply(UUID uuid, 
IgniteDiscoveryEvent evt) {
+                        return false;
+                    }
+                },
+                new P1<IgniteDiscoveryEvent>() {
+                    @Override public boolean apply(IgniteDiscoveryEvent e) {
+                        return false;
+                    }
+                }
+            );
+
+            assertNotNull(consumeId);
+        }
+        finally {
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+
+        try {
+            consumeId = grid(0).events().remoteListen(
+                new P2<UUID, IgniteEvent>() {
+                    @Override public boolean apply(UUID uuid, IgniteEvent evt) 
{
+                        return false;
+                    }
+                },
+                new P1<IgniteEvent>() {
+                    @Override public boolean apply(IgniteEvent e) {
+                        return false;
+                    }
+                }
+            );
+
+            assertNotNull(consumeId);
+        }
+        finally {
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAllEvents() throws Exception {
+        final Collection<UUID> nodeIds = new HashSet<>();
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
+
+        UUID consumeId = grid(0).events().remoteListen(
+            new P2<UUID, IgniteEvent>() {
+                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
+                    info("Event from " + nodeId + " [" + evt.shortDisplay() + 
']');
+
+                    if (evt.type() == EVT_JOB_STARTED) {
+                        nodeIds.add(nodeId);
+                        cnt.incrementAndGet();
+                        latch.countDown();
+                    }
+
+                    return true;
+                }
+            },
+            null
+        );
+
+        try {
+            assertNotNull(consumeId);
+
+            grid(0).compute().broadcast(F.noop());
+
+            assert latch.await(2, SECONDS);
+
+            assertEquals(GRID_CNT, nodeIds.size());
+            assertEquals(GRID_CNT, cnt.get());
+        }
+        finally {
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEventsByType() throws Exception {
+        final Collection<UUID> nodeIds = new HashSet<>();
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
+
+        UUID consumeId = grid(0).events().remoteListen(
+            new P2<UUID, IgniteEvent>() {
+                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
+                    info("Event from " + nodeId + " [" + evt.shortDisplay() + 
']');
+
+                    assertEquals(EVT_JOB_STARTED, evt.type());
+
+                    nodeIds.add(nodeId);
+                    cnt.incrementAndGet();
+                    latch.countDown();
+
+                    return true;
+                }
+            },
+            null,
+            EVT_JOB_STARTED
+        );
+
+        try {
+            assertNotNull(consumeId);
+
+            grid(0).compute().broadcast(F.noop());
+
+            assert latch.await(2, SECONDS);
+
+            assertEquals(GRID_CNT, nodeIds.size());
+            assertEquals(GRID_CNT, cnt.get());
+        }
+        finally {
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEventsByFilter() throws Exception {
+        final Collection<UUID> nodeIds = new HashSet<>();
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
+
+        UUID consumeId = grid(0).events().remoteListen(
+            new P2<UUID, IgniteEvent>() {
+                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
+                    info("Event from " + nodeId + " [" + evt.shortDisplay() + 
']');
+
+                    assertEquals(EVT_JOB_STARTED, evt.type());
+
+                    nodeIds.add(nodeId);
+                    cnt.incrementAndGet();
+                    latch.countDown();
+
+                    return true;
+                }
+            },
+            new P1<IgniteEvent>() {
+                @Override public boolean apply(IgniteEvent evt) {
+                    return evt.type() == EVT_JOB_STARTED;
+                }
+            }
+        );
+
+        try {
+            assertNotNull(consumeId);
+
+            grid(0).compute().broadcast(F.noop());
+
+            assert latch.await(2, SECONDS);
+
+            assertEquals(GRID_CNT, nodeIds.size());
+            assertEquals(GRID_CNT, cnt.get());
+        }
+        finally {
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEventsByTypeAndFilter() throws Exception {
+        final Collection<UUID> nodeIds = new HashSet<>();
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
+
+        UUID consumeId = grid(0).events().remoteListen(
+            new P2<UUID, IgniteJobEvent>() {
+                @Override public boolean apply(UUID nodeId, IgniteJobEvent 
evt) {
+                    info("Event from " + nodeId + " [" + evt.shortDisplay() + 
']');
+
+                    assertEquals(EVT_JOB_STARTED, evt.type());
+
+                    nodeIds.add(nodeId);
+                    cnt.incrementAndGet();
+                    latch.countDown();
+
+                    return true;
+                }
+            },
+            new P1<IgniteJobEvent>() {
+                @Override public boolean apply(IgniteJobEvent evt) {
+                    return !"exclude".equals(evt.taskName());
+                }
+            },
+            EVT_JOB_STARTED
+        );
+
+        try {
+            assertNotNull(consumeId);
+
+            grid(0).compute().broadcast(F.noop());
+            grid(0).compute().withName("exclude").run(F.noop());
+
+            assert latch.await(2, SECONDS);
+
+            assertEquals(GRID_CNT, nodeIds.size());
+            assertEquals(GRID_CNT, cnt.get());
+        }
+        finally {
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoteProjection() throws Exception {
+        final Collection<UUID> nodeIds = new HashSet<>();
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT - 1);
+
+        UUID consumeId = events(grid(0).forRemotes()).remoteListen(
+            new P2<UUID, IgniteEvent>() {
+                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
+                    info("Event from " + nodeId + " [" + evt.shortDisplay() + 
']');
+
+                    assertEquals(EVT_JOB_STARTED, evt.type());
+
+                    nodeIds.add(nodeId);
+                    cnt.incrementAndGet();
+                    latch.countDown();
+
+                    return true;
+                }
+            },
+            null,
+            EVT_JOB_STARTED
+        );
+
+        try {
+            assertNotNull(consumeId);
+
+            grid(0).compute().broadcast(F.noop());
+
+            assert latch.await(2, SECONDS);
+
+            assertEquals(GRID_CNT - 1, nodeIds.size());
+            assertEquals(GRID_CNT - 1, cnt.get());
+        }
+        finally {
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testProjectionWithLocalNode() throws Exception {
+        final Collection<UUID> nodeIds = new HashSet<>();
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT - 1);
+
+        UUID consumeId = events(grid(0).forAttribute("include", 
null)).remoteListen(
+            new P2<UUID, IgniteEvent>() {
+                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
+                    info("Event from " + nodeId + " [" + evt.shortDisplay() + 
']');
+
+                    assertEquals(EVT_JOB_STARTED, evt.type());
+
+                    nodeIds.add(nodeId);
+                    cnt.incrementAndGet();
+                    latch.countDown();
+
+                    return true;
+                }
+            },
+            null,
+            EVT_JOB_STARTED
+        );
+
+        try {
+            assertNotNull(consumeId);
+
+            grid(0).compute().broadcast(F.noop());
+
+            assert latch.await(2, SECONDS);
+
+            assertEquals(GRID_CNT - 1, nodeIds.size());
+            assertEquals(GRID_CNT - 1, cnt.get());
+        }
+        finally {
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalNodeOnly() throws Exception {
+        final Collection<UUID> nodeIds = new HashSet<>();
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        UUID consumeId = events(grid(0).forLocal()).remoteListen(
+            new P2<UUID, IgniteEvent>() {
+                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
+                    info("Event from " + nodeId + " [" + evt.shortDisplay() + 
']');
+
+                    assertEquals(EVT_JOB_STARTED, evt.type());
+
+                    nodeIds.add(nodeId);
+                    cnt.incrementAndGet();
+                    latch.countDown();
+
+                    return true;
+                }
+            },
+            null,
+            EVT_JOB_STARTED
+        );
+
+        try {
+            assertNotNull(consumeId);
+
+            grid(0).compute().broadcast(F.noop());
+
+            assert latch.await(2, SECONDS);
+
+            assertEquals(1, nodeIds.size());
+            assertEquals(1, cnt.get());
+
+            assertEquals(grid(0).localNode().id(), F.first(nodeIds));
+        }
+        finally {
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEmptyProjection() throws Exception {
+        try {
+            
events(grid(0).forPredicate(F.<ClusterNode>alwaysFalse())).remoteListen(
+                new P2<UUID, IgniteEvent>() {
+                    @Override public boolean apply(UUID nodeId, IgniteEvent 
evt) {
+                        return true;
+                    }
+                },
+                null
+            );
+
+            assert false : "Exception was not thrown.";
+        }
+        catch (IgniteCheckedException e) {
+            assertTrue(e.getMessage().startsWith(
+                "Failed to register remote continuous listener (projection is 
empty)."));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStopByCallback() throws Exception {
+        final Collection<UUID> nodeIds = new HashSet<>();
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        UUID consumeId = grid(0).events().remoteListen(
+            new P2<UUID, IgniteEvent>() {
+                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
+                    info("Event from " + nodeId + " [" + evt.shortDisplay() + 
']');
+
+                    assertEquals(EVT_JOB_STARTED, evt.type());
+
+                    nodeIds.add(nodeId);
+                    cnt.incrementAndGet();
+                    latch.countDown();
+
+                    return false;
+                }
+            },
+            null,
+            EVT_JOB_STARTED
+        );
+
+        try {
+            assertNotNull(consumeId);
+
+            grid(0).compute().broadcast(F.noop());
+
+            assert latch.await(2, SECONDS);
+
+            assertEquals(1, nodeIds.size());
+            assertEquals(1, cnt.get());
+        }
+        finally {
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStopRemoteListen() throws Exception {
+        final Collection<UUID> nodeIds = new HashSet<>();
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        UUID consumeId = grid(0).events().remoteListen(
+            new P2<UUID, IgniteEvent>() {
+                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
+                    info("Event from " + nodeId + " [" + evt.shortDisplay() + 
']');
+
+                    assertEquals(EVT_JOB_STARTED, evt.type());
+
+                    nodeIds.add(nodeId);
+                    cnt.incrementAndGet();
+                    latch.countDown();
+
+                    return true;
+                }
+            },
+            null,
+            EVT_JOB_STARTED
+        );
+
+        try {
+            assertNotNull(consumeId);
+
+            grid(0).compute().run(F.noop());
+
+            assert latch.await(2, SECONDS);
+
+            assertEquals(1, nodeIds.size());
+            assertEquals(1, cnt.get());
+
+            grid(0).events().stopRemoteListen(consumeId);
+
+            grid(0).compute().run(F.noop());
+
+            U.sleep(500);
+
+            assertEquals(1, nodeIds.size());
+            assertEquals(1, cnt.get());
+        }
+        finally {
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStopLocalListenByCallback() throws Exception {
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        grid(0).events().localListen(
+            new P1<IgniteEvent>() {
+                @Override public boolean apply(IgniteEvent evt) {
+                    info("Local event [" + evt.shortDisplay() + ']');
+
+                    assertEquals(EVT_JOB_STARTED, evt.type());
+
+                    cnt.incrementAndGet();
+                    latch.countDown();
+
+                    return false;
+                }
+            },
+            EVT_JOB_STARTED);
+
+        compute(grid(0).forLocal()).run(F.noop());
+
+        assert latch.await(2, SECONDS);
+
+        assertEquals(1, cnt.get());
+
+        compute(grid(0).forLocal()).run(F.noop());
+
+        U.sleep(500);
+
+        assertEquals(1, cnt.get());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeJoin() throws Exception {
+        final Collection<UUID> nodeIds = new HashSet<>();
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT + 1);
+
+        UUID consumeId = grid(0).events().remoteListen(
+            new P2<UUID, IgniteEvent>() {
+                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
+                    info("Event from " + nodeId + " [" + evt.shortDisplay() + 
']');
+
+                    assertEquals(EVT_JOB_STARTED, evt.type());
+
+                    nodeIds.add(nodeId);
+                    cnt.incrementAndGet();
+                    latch.countDown();
+
+                    return true;
+                }
+            },
+            new P1<IgniteEvent>() {
+                @Override public boolean apply(IgniteEvent evt) {
+                    return evt.type() == EVT_JOB_STARTED;
+                }
+            },
+            EVT_JOB_STARTED, EVT_JOB_FINISHED
+        );
+
+        try {
+            assertNotNull(consumeId);
+
+            include = true;
+
+            startGrid("anotherGrid");
+
+            grid(0).compute().broadcast(F.noop());
+
+            assert latch.await(2, SECONDS);
+
+            assertEquals(GRID_CNT + 1, nodeIds.size());
+            assertEquals(GRID_CNT + 1, cnt.get());
+        }
+        finally {
+            stopGrid("anotherGrid");
+
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeJoinWithProjection() throws Exception {
+        final Collection<UUID> nodeIds = new HashSet<>();
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
+
+        UUID consumeId = events(grid(0).forAttribute("include", 
null)).remoteListen(
+            new P2<UUID, IgniteEvent>() {
+                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
+                    info("Event from " + nodeId + " [" + evt.shortDisplay() + 
']');
+
+                    assertEquals(EVT_JOB_STARTED, evt.type());
+
+                    nodeIds.add(nodeId);
+                    cnt.incrementAndGet();
+                    latch.countDown();
+
+                    return true;
+                }
+            },
+            null,
+            EVT_JOB_STARTED
+        );
+
+        try {
+            assertNotNull(consumeId);
+
+            include = true;
+
+            startGrid("anotherGrid1");
+
+            include = false;
+
+            startGrid("anotherGrid2");
+
+            grid(0).compute().broadcast(F.noop());
+
+            assert latch.await(2, SECONDS);
+
+            assertEquals(GRID_CNT, nodeIds.size());
+            assertEquals(GRID_CNT, cnt.get());
+        }
+        finally {
+            stopGrid("anotherGrid1");
+            stopGrid("anotherGrid2");
+
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    // TODO: GG-6730
+    public void _testNodeJoinWithP2P() throws Exception {
+        final Collection<UUID> nodeIds = new HashSet<>();
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT + 1);
+
+        ClassLoader ldr = getExternalClassLoader();
+
+        IgnitePredicate<ClusterNode> prjPred = 
(IgnitePredicate<ClusterNode>)ldr.loadClass(PRJ_PRED_CLS_NAME).newInstance();
+        IgnitePredicate<IgniteEvent> filter = 
(IgnitePredicate<IgniteEvent>)ldr.loadClass(FILTER_CLS_NAME).newInstance();
+
+        UUID consumeId = 
events(grid(0).forPredicate(prjPred)).remoteListen(new P2<UUID, IgniteEvent>() {
+            @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
+                info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
+
+                assertEquals(EVT_JOB_STARTED, evt.type());
+
+                nodeIds.add(nodeId);
+                cnt.incrementAndGet();
+                latch.countDown();
+
+                return true;
+            }
+        }, filter, EVT_JOB_STARTED);
+
+        try {
+            assertNotNull(consumeId);
+
+            startGrid("anotherGrid");
+
+            grid(0).compute().broadcast(F.noop());
+
+            assert latch.await(2, SECONDS);
+
+            assertEquals(GRID_CNT + 1, nodeIds.size());
+            assertEquals(GRID_CNT + 1, cnt.get());
+        }
+        finally {
+            stopGrid("anotherGrid1");
+            stopGrid("anotherGrid2");
+
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testResources() throws Exception {
+        final Collection<UUID> nodeIds = new HashSet<>();
+        final AtomicInteger cnt = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
+
+        UUID consumeId = grid(0).events().remoteListen(
+            new P2<UUID, IgniteEvent>() {
+                @IgniteInstanceResource
+                private Ignite grid;
+
+                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
+                    info("Event from " + nodeId + " [" + evt.shortDisplay() + 
']');
+
+                    assertEquals(EVT_JOB_STARTED, evt.type());
+                    assertNotNull(grid);
+
+                    nodeIds.add(nodeId);
+                    cnt.incrementAndGet();
+                    latch.countDown();
+
+                    return true;
+                }
+            },
+            new P1<IgniteEvent>() {
+                @IgniteInstanceResource
+                private Ignite grid;
+
+                @Override public boolean apply(IgniteEvent evt) {
+                    assertNotNull(grid);
+
+                    return true;
+                }
+            },
+            EVT_JOB_STARTED
+        );
+
+        try {
+            assertNotNull(consumeId);
+
+            grid(0).compute().broadcast(F.noop());
+
+            assert latch.await(2, SECONDS);
+
+            assertEquals(GRID_CNT, nodeIds.size());
+            assertEquals(GRID_CNT, cnt.get());
+        }
+        finally {
+            grid(0).events().stopRemoteListen(consumeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMasterNodeLeave() throws Exception {
+        Ignite g = startGrid("anotherGrid");
+
+        final UUID nodeId = g.cluster().localNode().id();
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            grid(0).events().localListen(new IgnitePredicate<IgniteEvent>() {
+                @Override public boolean apply(IgniteEvent evt) {
+                    if (nodeId.equals(((IgniteDiscoveryEvent) 
evt).eventNode().id()))
+                        latch.countDown();
+
+                    return true;
+                }
+            }, EVT_NODE_LEFT);
+        }
+
+        g.events().remoteListen(
+            null,
+            new P1<IgniteEvent>() {
+                @Override public boolean apply(IgniteEvent evt) {
+                    return true;
+                }
+            },
+            EVTS_ALL
+        );
+
+        stopGrid("anotherGrid");
+
+        latch.await();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMasterNodeLeaveNoAutoUnsubscribe() throws Exception {
+        Ignite g = startGrid("anotherGrid");
+
+        final UUID nodeId = g.cluster().localNode().id();
+        final CountDownLatch discoLatch = new CountDownLatch(GRID_CNT);
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            grid(0).events().localListen(new IgnitePredicate<IgniteEvent>() {
+                @Override public boolean apply(IgniteEvent evt) {
+                    if (nodeId.equals(((IgniteDiscoveryEvent) 
evt).eventNode().id()))
+                        discoLatch.countDown();
+
+                    return true;
+                }
+            }, EVT_NODE_LEFT);
+        }
+
+        consumeLatch = new CountDownLatch(GRID_CNT * 2 + 1);
+        consumeCnt = new AtomicInteger();
+
+        noAutoUnsubscribe = true;
+
+        g.events().remoteListen(
+            1, 0, false,
+            null,
+            new P1<IgniteEvent>() {
+                @Override public boolean apply(IgniteEvent evt) {
+                    consumeLatch.countDown();
+                    consumeCnt.incrementAndGet();
+
+                    return true;
+                }
+            },
+            EVT_JOB_STARTED
+        );
+
+        grid(0).compute().broadcast(F.noop());
+
+        stopGrid("anotherGrid");
+
+        discoLatch.await();
+
+        grid(0).compute().broadcast(F.noop());
+
+        assert consumeLatch.await(2, SECONDS);
+
+        assertEquals(GRID_CNT * 2 + 1, consumeCnt.get());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedWithNodeRestart() throws Exception {
+        final AtomicBoolean stop = new AtomicBoolean();
+        final BlockingQueue<IgniteBiTuple<Integer, UUID>> queue = new 
LinkedBlockingQueue<>();
+        final Collection<UUID> started = new GridConcurrentHashSet<>();
+        final Collection<UUID> stopped = new GridConcurrentHashSet<>();
+
+        final Random rnd = new Random();
+
+        IgniteFuture<?> starterFut = multithreadedAsync(new Callable<Object>() 
{
+            @Override public Object call() throws Exception {
+                for (int i = 0; i < CONSUME_CNT; i++) {
+                    int idx = rnd.nextInt(GRID_CNT);
+
+                    try {
+                        IgniteEvents evts = grid(idx).events().enableAsync();
+
+                        evts.remoteListen(new P2<UUID, IgniteEvent>() {
+                            @Override public boolean apply(UUID uuid, 
IgniteEvent evt) {
+                                return true;
+                            }
+                        }, null, EVT_JOB_STARTED);
+
+                        UUID consumeId = evts.<UUID>future().get(3000);
+
+                        started.add(consumeId);
+
+                        queue.add(F.t(idx, consumeId));
+                    }
+                    catch (ClusterTopologyException ignored) {
+                        // No-op.
+                    }
+
+                    U.sleep(10);
+                }
+
+                stop.set(true);
+
+                return null;
+            }
+        }, 8, "consume-starter");
+
+        IgniteFuture<?> stopperFut = multithreadedAsync(new Callable<Object>() 
{
+            @Override public Object call() throws Exception {
+                while (!stop.get()) {
+                    IgniteBiTuple<Integer, UUID> t = queue.poll(1, SECONDS);
+
+                    if (t == null)
+                        continue;
+
+                    int idx = t.get1();
+                    UUID consumeId = t.get2();
+
+                    try {
+                        IgniteEvents evts = grid(idx).events().enableAsync();
+
+                        evts.stopRemoteListen(consumeId);
+
+                        evts.future().get(3000);
+
+                        stopped.add(consumeId);
+                    }
+                    catch (ClusterTopologyException ignored) {
+                        // No-op.
+                    }
+                }
+
+                return null;
+            }
+        }, 4, "consume-stopper");
+
+        IgniteFuture<?> nodeRestarterFut = multithreadedAsync(new 
Callable<Object>() {
+            @Override public Object call() throws Exception {
+                while (!stop.get()) {
+                    startGrid("anotherGrid");
+                    stopGrid("anotherGrid");
+                }
+
+                return null;
+            }
+        }, 1, "node-restarter");
+
+        IgniteFuture<?> jobRunnerFut = multithreadedAsync(new 
Callable<Object>() {
+            @Override public Object call() throws Exception {
+                while (!stop.get()) {
+                    int idx = rnd.nextInt(GRID_CNT);
+
+                    try {
+                        IgniteCompute comp = grid(idx).compute().enableAsync();
+
+                        comp.run(F.noop());
+
+                        comp.future().get(3000);
+                    }
+                    catch (IgniteCheckedException ignored) {
+                        // Ignore all job execution related errors.
+                    }
+                }
+
+                return null;
+            }
+        }, 1, "job-runner");
+
+        starterFut.get();
+        stopperFut.get();
+        nodeRestarterFut.get();
+        jobRunnerFut.get();
+
+        IgniteBiTuple<Integer, UUID> t;
+
+        while ((t = queue.poll()) != null) {
+            int idx = t.get1();
+            UUID consumeId = t.get2();
+
+            IgniteEvents evts = grid(idx).events().enableAsync();
+
+            evts.stopRemoteListen(consumeId);
+
+            evts.future().get(3000);
+
+            stopped.add(consumeId);
+        }
+
+        Collection<UUID> notStopped = F.lose(started, true, stopped);
+
+        assertEquals("Not stopped IDs: " + notStopped, 0, notStopped.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridMessageListenSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridMessageListenSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridMessageListenSelfTest.java
new file mode 100644
index 0000000..1940aaa
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridMessageListenSelfTest.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.messaging.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.gridgain.testframework.*;
+import org.gridgain.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+
+/**
+ * Message listen test.
+ */
+public class GridMessageListenSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final int GRID_CNT = 3;
+
+    /** */
+    private static final String INC_ATTR = "include";
+
+    /** */
+    private static final String MSG = "Message";
+
+    /** */
+    private static final String TOPIC = "Topic";
+
+    /** */
+    private static final int MSG_CNT = 3;
+
+    /** */
+    private static final String TOPIC_CLS_NAME = 
"org.gridgain.grid.tests.p2p.GridTestMessageTopic";
+
+    /** */
+    private static final String LSNR_CLS_NAME = 
"org.gridgain.grid.tests.p2p.GridTestMessageListener";
+
+    /** */
+    private static boolean include;
+
+    /** */
+    private static final List<UUID> allNodes = new ArrayList<>();
+
+    /** */
+    private static final List<UUID> rmtNodes = new ArrayList<>();
+
+    /** */
+    private static final List<UUID> incNodes = new ArrayList<>();
+
+    /** */
+    private static final Collection<UUID> nodes = new 
GridConcurrentHashSet<>();
+
+    /** */
+    private static final AtomicInteger cnt = new AtomicInteger();
+
+    /** */
+    private static CountDownLatch latch;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (include)
+            cfg.setUserAttributes(F.asMap(INC_ATTR, true));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        nodes.clear();
+        cnt.set(0);
+
+        include = true;
+
+        startGridsMultiThreaded(GRID_CNT - 1);
+
+        include = false;
+
+        Thread.sleep(500);
+
+        startGrid(GRID_CNT - 1);
+
+        allNodes.clear();
+        rmtNodes.clear();
+        incNodes.clear();
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            UUID id = grid(i).localNode().id();
+
+            allNodes.add(id);
+
+            if (i != 0)
+                rmtNodes.add(id);
+
+            if (i != GRID_CNT - 1)
+                incNodes.add(id);
+        }
+
+        Collections.sort(allNodes);
+        Collections.sort(rmtNodes);
+        Collections.sort(incNodes);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNullTopic() throws Exception {
+        latch = new CountDownLatch(MSG_CNT * GRID_CNT);
+
+        listen(grid(0), null, true);
+
+        send();
+
+        assert latch.await(2, SECONDS);
+
+        Thread.sleep(500);
+
+        assertEquals(MSG_CNT * GRID_CNT, cnt.get());
+
+        checkNodes(allNodes);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonNullTopic() throws Exception {
+        latch = new CountDownLatch(MSG_CNT * GRID_CNT);
+
+        listen(grid(0), null, true);
+
+        send();
+
+        assert latch.await(2, SECONDS);
+
+        Thread.sleep(500);
+
+        assertEquals(MSG_CNT * GRID_CNT, cnt.get());
+
+        checkNodes(allNodes);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStopListen() throws Exception {
+        latch = new CountDownLatch(GRID_CNT);
+
+        listen(grid(0), null, false);
+
+        send();
+
+        assert latch.await(2, SECONDS);
+
+        Thread.sleep(500);
+
+        int expCnt = cnt.get();
+
+        send();
+
+        Thread.sleep(1000);
+
+        assertEquals(expCnt, cnt.get());
+
+        checkNodes(allNodes);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testProjection() throws Exception {
+        latch = new CountDownLatch(MSG_CNT * (GRID_CNT - 1));
+
+        listen(grid(0).forRemotes(), null, true);
+
+        send();
+
+        assert latch.await(2, SECONDS);
+
+        Thread.sleep(500);
+
+        assertEquals(MSG_CNT * (GRID_CNT - 1), cnt.get());
+
+        checkNodes(rmtNodes);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeJoin() throws Exception {
+        latch = new CountDownLatch(MSG_CNT * (GRID_CNT + 1));
+
+        listen(grid(0), null, true);
+
+        try {
+            Ignite g = startGrid("anotherGrid");
+
+            send();
+
+            assert latch.await(2, SECONDS);
+
+            Thread.sleep(500);
+
+            assertEquals(MSG_CNT * (GRID_CNT + 1), cnt.get());
+
+            List<UUID> allNodes0 = new ArrayList<>(allNodes);
+
+            allNodes0.add(g.cluster().localNode().id());
+
+            Collections.sort(allNodes0);
+
+            checkNodes(allNodes0);
+        }
+        finally {
+            stopGrid("anotherGrid");
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeJoinWithProjection() throws Exception {
+        latch = new CountDownLatch(MSG_CNT * GRID_CNT);
+
+        listen(grid(0).forAttribute(INC_ATTR, null), null, true);
+
+        try {
+            include = true;
+
+            Ignite g = startGrid("anotherGrid1");
+
+            include = false;
+
+            startGrid("anotherGrid2");
+
+            send();
+
+            assert latch.await(2, SECONDS);
+
+            Thread.sleep(500);
+
+            assertEquals(MSG_CNT * GRID_CNT, cnt.get());
+
+            List<UUID> incNodes0 = new ArrayList<>(incNodes);
+
+            incNodes0.add(g.cluster().localNode().id());
+
+            Collections.sort(incNodes0);
+
+            checkNodes(incNodes0);
+        }
+        finally {
+            stopGrid("anotherGrid1");
+            stopGrid("anotherGrid2");
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNullTopicWithDeployment() throws Exception {
+        Class<?> cls = getExternalClassLoader().loadClass(LSNR_CLS_NAME);
+
+        grid(0).message().remoteListen(null, (IgniteBiPredicate<UUID, 
Object>)cls.newInstance());
+
+        send();
+
+        boolean s = GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return checkDeployedListeners(GRID_CNT);
+            }
+        }, 2000);
+
+        assertTrue(s);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonNullTopicWithDeployment() throws Exception {
+        ClassLoader ldr = getExternalClassLoader();
+
+        Class<?> topicCls = ldr.loadClass(TOPIC_CLS_NAME);
+        Class<?> lsnrCls = ldr.loadClass(LSNR_CLS_NAME);
+
+        Object topic = topicCls.newInstance();
+
+        grid(0).message().remoteListen(topic, (IgniteBiPredicate<UUID, 
Object>)lsnrCls.newInstance());
+
+        send(topic);
+
+        boolean s = GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return checkDeployedListeners(GRID_CNT);
+            }
+        }, 2000);
+
+        assertTrue(s);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testListenActor() throws Exception {
+        latch = new CountDownLatch(MSG_CNT * (GRID_CNT + 1));
+
+        grid(0).message().remoteListen(null, new Actor(grid(0)));
+
+        try {
+            Ignite g = startGrid("anotherGrid");
+
+            send();
+
+            assert latch.await(2, SECONDS);
+
+            Thread.sleep(500);
+
+            assertEquals(MSG_CNT * (GRID_CNT + 1), cnt.get());
+
+            List<UUID> allNodes0 = new ArrayList<>(allNodes);
+
+            allNodes0.add(g.cluster().localNode().id());
+
+            Collections.sort(allNodes0);
+
+            checkNodes(allNodes0);
+        }
+        finally {
+            stopGrid("anotherGrid");
+        }
+    }
+
+    /**
+     * @param prj Projection.
+     * @param topic Topic.
+     * @param ret Value returned from listener.
+     * @throws Exception In case of error.
+     */
+    private void listen(final ClusterGroup prj, @Nullable Object topic, final 
boolean ret) throws Exception {
+        assert prj != null;
+
+        message(prj).remoteListen(topic, new Listener(prj, ret));
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    private void send() throws Exception {
+        send(TOPIC);
+    }
+
+    /**
+     * @param topic Non-null topic.
+     * @throws Exception In case of error.
+     */
+    private void send(Object topic) throws Exception {
+        assert topic != null;
+
+        for (int i = 0; i < MSG_CNT; i++)
+            grid(0).message().send(null, MSG);
+
+        for (int i = 0; i < MSG_CNT; i++)
+            grid(0).message().send(topic, MSG);
+    }
+
+    /**
+     * @param expCnt Expected messages count.
+     * @return If check passed.
+     */
+    private boolean checkDeployedListeners(int expCnt) {
+        for (Ignite g : G.allGrids()) {
+            AtomicInteger cnt = g.cluster().<String, 
AtomicInteger>nodeLocalMap().get("msgCnt");
+
+            if (cnt == null || cnt.get() != expCnt)
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * @param expNodes Expected nodes.
+     */
+    private void checkNodes(List<UUID> expNodes) {
+        List<UUID> nodes0 = new ArrayList<>(nodes);
+
+        Collections.sort(nodes0);
+
+        assertEquals(expNodes, nodes0);
+    }
+
+    /** */
+    private static class Listener implements P2<UUID, Object> {
+        /** */
+        private final ClusterGroup prj;
+
+        /** */
+        private final boolean ret;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /**
+         * @param prj Projection.
+         * @param ret Return value.
+         */
+        private Listener(ClusterGroup prj, boolean ret) {
+            this.prj = prj;
+            this.ret = ret;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID nodeId, Object msg) {
+            assertNotNull(ignite);
+            assertNotNull(ignite.configuration().getNodeId());
+
+            X.println("Received message [nodeId=" + nodeId + ", locNodeId=" + 
ignite.cluster().localNode().id() + ']');
+
+            assertEquals(prj.ignite().cluster().localNode().id(), nodeId);
+            assertEquals(MSG, msg);
+
+            nodes.add(ignite.configuration().getNodeId());
+            cnt.incrementAndGet();
+            latch.countDown();
+
+            return ret;
+        }
+    }
+
+    /** */
+    private static class Actor extends MessagingListenActor<Object> {
+        /** */
+        private final ClusterGroup prj;
+
+        /**
+         * @param prj Projection.
+         */
+        private Actor(ClusterGroup prj) {
+            this.prj = prj;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void receive(UUID nodeId, Object msg) throws 
Throwable {
+            assertNotNull(ignite());
+
+            UUID locNodeId = ignite().cluster().localNode().id();
+
+            X.println("Received message [nodeId=" + nodeId + ", locNodeId=" + 
locNodeId + ']');
+
+            assertEquals(prj.ignite().cluster().localNode().id(), nodeId);
+            assertEquals(MSG, msg);
+
+            nodes.add(locNodeId);
+            cnt.incrementAndGet();
+            latch.countDown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
new file mode 100644
index 0000000..e3d3397
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Tests for {@code GridDataLoaderImpl}.
+ */
+public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Number of keys to load via data loader. */
+    private static final int KEYS_COUNT = 1000;
+
+    /** Started grid counter. */
+    private static int cnt;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        // Forth node goes without cache.
+        if (cnt < 4)
+            cfg.setCacheConfiguration(cacheConfiguration());
+
+        cnt++;
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNullPointerExceptionUponDataLoaderClosing() throws 
Exception {
+        try {
+            startGrids(5);
+
+            final CyclicBarrier barrier = new CyclicBarrier(2);
+
+            multithreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    U.awaitQuiet(barrier);
+
+                    G.stopAll(true);
+
+                    return null;
+                }
+            }, 1);
+
+            Ignite g4 = grid(4);
+
+            IgniteDataLoader<Object, Object> dataLdr = g4.dataLoader(null);
+
+            dataLdr.perNodeBufferSize(32);
+
+            for (int i = 0; i < 100000; i += 2) {
+                dataLdr.addData(i, i);
+                dataLdr.removeData(i + 1);
+            }
+
+            U.awaitQuiet(barrier);
+
+            info("Closing data loader.");
+
+            try {
+                dataLdr.close(true);
+            }
+            catch (IllegalStateException ignore) {
+                // This is ok to ignore this exception as test is racy by it's 
nature -
+                // grid is stopping in different thread.
+            }
+        }
+        finally {
+            G.stopAll(true);
+        }
+    }
+
+    /**
+     * Data loader should correctly load entries from HashMap in case of grids 
with more than one node
+     *  and with GridOptimizedMarshaller that requires serializable.
+     *
+     * @throws Exception If failed.
+     */
+    public void testAddDataFromMap() throws Exception {
+        try {
+            cnt = 0;
+
+            startGrids(2);
+
+            Ignite g0 = grid(0);
+
+            IgniteMarshaller marsh = g0.configuration().getMarshaller();
+
+            if (marsh instanceof IgniteOptimizedMarshaller)
+                
assertTrue(((IgniteOptimizedMarshaller)marsh).isRequireSerializable());
+            else
+                fail("Expected GridOptimizedMarshaller, but found: " + 
marsh.getClass().getName());
+
+            IgniteDataLoader<Integer, String> dataLdr = g0.dataLoader(null);
+
+            Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
+
+            for (int i = 0; i < KEYS_COUNT; i ++)
+                map.put(i, String.valueOf(i));
+
+            dataLdr.addData(map);
+
+            dataLdr.close();
+
+            Random rnd = new Random();
+
+            GridCache<Integer, String> c = g0.cache(null);
+
+            for (int i = 0; i < KEYS_COUNT; i ++) {
+                Integer k = rnd.nextInt(KEYS_COUNT);
+
+                String v = c.get(k);
+
+                assertEquals(k.toString(), v);
+            }
+        }
+        finally {
+            G.stopAll(true);
+        }
+    }
+
+    /**
+     * Gets cache configuration.
+     *
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration() {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setBackups(1);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return cacheCfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestObject implements Serializable {
+        /** */
+        private int val;
+
+        /**
+         */
+        private TestObject() {
+            // No-op.
+        }
+
+        /**
+         * @param val Value.
+         */
+        private TestObject(int val) {
+            this.val = val;
+        }
+
+        public Integer val() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof TestObject && ((TestObject)obj).val == val;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
new file mode 100644
index 0000000..77d6b06
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+import org.jdk8.backport.*;
+
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Data loader performance test. Compares group lock data loader to 
traditional lock.
+ * <p>
+ * Disable assertions and give at least 2 GB heap to run this test.
+ */
+public class GridDataLoaderPerformanceTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int GRID_CNT = 3;
+
+    /** */
+    private static final int ENTRY_CNT = 80000;
+
+    /** */
+    private boolean useCache;
+
+    /** */
+    private boolean useGrpLock;
+
+    /** */
+    private String[] vals = new String[2048];
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(spi);
+
+        cfg.setIncludeProperties();
+
+        cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, 
EVT_JOB_MAPPED);
+
+        cfg.setRestEnabled(false);
+
+        cfg.setPeerClassLoadingEnabled(true);
+
+        if (useCache) {
+            CacheConfiguration cc = defaultCacheConfiguration();
+
+            cc.setCacheMode(PARTITIONED);
+
+            cc.setDistributionMode(PARTITIONED_ONLY);
+            cc.setWriteSynchronizationMode(FULL_SYNC);
+            cc.setStartSize(ENTRY_CNT / GRID_CNT);
+            cc.setSwapEnabled(false);
+
+            cc.setBackups(1);
+
+            cc.setStoreValueBytes(true);
+
+            cfg.setCacheSanityCheckEnabled(false);
+            cfg.setCacheConfiguration(cc);
+        }
+        else
+            cfg.setCacheConfiguration();
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        for (int i = 0; i < vals.length; i++) {
+            int valLen = ThreadLocalRandom8.current().nextInt(128, 512);
+
+            StringBuilder sb = new StringBuilder();
+
+            for (int j = 0; j < valLen; j++)
+                sb.append('a' + ThreadLocalRandom8.current().nextInt(20));
+
+            vals[i] = sb.toString();
+
+            info("Value: " + vals[i]);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPerformance() throws Exception {
+        useGrpLock = false;
+
+        doTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPerformanceGroupLock() throws Exception {
+        useGrpLock = true;
+
+        doTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTest() throws Exception {
+        System.gc();
+        System.gc();
+        System.gc();
+
+        try {
+            useCache = true;
+
+            startGridsMultiThreaded(GRID_CNT);
+
+            useCache = false;
+
+            Ignite ignite = startGrid();
+
+            final IgniteDataLoader<Integer, String> ldr = 
ignite.dataLoader(null);
+
+            ldr.perNodeBufferSize(8192);
+            ldr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, 
String>groupLocked() :
+                GridDataLoadCacheUpdaters.<Integer, String>batchedSorted());
+            ldr.autoFlushFrequency(0);
+
+            final LongAdder cnt = new LongAdder();
+
+            long start = U.currentTimeMillis();
+
+            Thread t = new Thread(new Runnable() {
+                @SuppressWarnings("BusyWait")
+                @Override public void run() {
+                    while (true) {
+                        try {
+                            Thread.sleep(10000);
+                        }
+                        catch (InterruptedException ignored) {
+                            break;
+                        }
+
+                        info(">>> Adds/sec: " + cnt.sumThenReset() / 10);
+                    }
+                }
+            });
+
+            t.setDaemon(true);
+
+            t.start();
+
+            int threadNum = 2;//Runtime.getRuntime().availableProcessors();
+
+            multithreaded(new Callable<Object>() {
+                @SuppressWarnings("InfiniteLoopStatement")
+                @Override public Object call() throws Exception {
+                    ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
+
+                    while (true) {
+                        int i = rnd.nextInt(ENTRY_CNT);
+
+                        ldr.addData(i, vals[rnd.nextInt(vals.length)]);
+
+                        cnt.increment();
+                    }
+                }
+            }, threadNum, "loader");
+
+            info("Closing loader...");
+
+            ldr.close(false);
+
+            long duration = U.currentTimeMillis() - start;
+
+            info("Finished performance test. Duration: " + duration + "ms.");
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

Reply via email to