http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessorSelfTest.java
new file mode 100644
index 0000000..4aff1af
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessorSelfTest.java
@@ -0,0 +1,970 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class DataStreamerProcessorSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static ConcurrentHashMap<Object, Object> storeMap;
+
+    /** */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private CacheMode mode = PARTITIONED;
+
+    /** */
+    private boolean nearEnabled = true;
+
+    /** */
+    private boolean useCache;
+
+    /** */
+    private TestStore store;
+
+    /** {@inheritDoc} */
+    @Override public void afterTest() throws Exception {
+        super.afterTest();
+
+        useCache = false;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"IfMayBeConditional", "unchecked"})
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+
+        cfg.setIncludeProperties();
+
+        cfg.setMarshaller(new OptimizedMarshaller(false));
+
+        if (useCache) {
+            CacheConfiguration cc = defaultCacheConfiguration();
+
+            cc.setCacheMode(mode);
+            cc.setAtomicityMode(TRANSACTIONAL);
+            cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : 
PARTITIONED_ONLY);
+            cc.setWriteSynchronizationMode(FULL_SYNC);
+
+            cc.setEvictSynchronized(false);
+            cc.setEvictNearSynchronized(false);
+
+            if (store != null) {
+                cc.setCacheStoreFactory(new 
FactoryBuilder.SingletonFactory(store));
+                cc.setReadThrough(true);
+                cc.setWriteThrough(true);
+            }
+
+            cfg.setCacheConfiguration(cc);
+        }
+        else
+            cfg.setCacheConfiguration();
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitioned() throws Exception {
+        mode = PARTITIONED;
+
+        checkDataStreamer();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testColocated() throws Exception {
+        mode = PARTITIONED;
+        nearEnabled = false;
+
+        checkDataStreamer();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicated() throws Exception {
+        mode = REPLICATED;
+
+        checkDataStreamer();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocal() throws Exception {
+        mode = LOCAL;
+
+        try {
+            checkDataStreamer();
+
+            assert false;
+        }
+        catch (IgniteCheckedException e) {
+            // Cannot load local cache configured remotely.
+            info("Caught expected exception: " + e);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ErrorNotRethrown")
+    private void checkDataStreamer() throws Exception {
+        try {
+            Ignite g1 = startGrid(1);
+
+            useCache = true;
+
+            Ignite g2 = startGrid(2);
+            startGrid(3);
+
+            final IgniteDataStreamer<Integer, Integer> ldr = 
g1.dataStreamer(null);
+
+            ldr.updater(DataStreamerCacheUpdaters.<Integer, 
Integer>batchedSorted());
+
+            final AtomicInteger idxGen = new AtomicInteger();
+            final int cnt = 400;
+            final int threads = 10;
+
+            final CountDownLatch l1 = new CountDownLatch(threads);
+
+            IgniteInternalFuture<?> f1 = multithreadedAsync(new 
Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
+
+                    for (int i = 0; i < cnt; i++) {
+                        int idx = idxGen.getAndIncrement();
+
+                        futs.add(ldr.addData(idx, idx));
+                    }
+
+                    l1.countDown();
+
+                    for (IgniteFuture<?> fut : futs)
+                        fut.get();
+
+                    return null;
+                }
+            }, threads);
+
+            l1.await();
+
+            // This will wait until data streamer finishes loading.
+            stopGrid(getTestGridName(1), false);
+
+            f1.get();
+
+            int s2 = internalCache(2).primaryKeySet().size();
+            int s3 = internalCache(3).primaryKeySet().size();
+            int total = threads * cnt;
+
+            assertEquals(total, s2 + s3);
+
+            final IgniteDataStreamer<Integer, Integer> rmvLdr = 
g2.dataStreamer(null);
+
+            rmvLdr.updater(DataStreamerCacheUpdaters.<Integer, 
Integer>batchedSorted());
+
+            final CountDownLatch l2 = new CountDownLatch(threads);
+
+            IgniteInternalFuture<?> f2 = multithreadedAsync(new 
Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
+
+                    for (int i = 0; i < cnt; i++) {
+                        final int key = idxGen.decrementAndGet();
+
+                        futs.add(rmvLdr.removeData(key));
+                    }
+
+                    l2.countDown();
+
+                    for (IgniteFuture<?> fut : futs)
+                        fut.get();
+
+                    return null;
+                }
+            }, threads);
+
+            l2.await();
+
+            rmvLdr.close(false);
+
+            f2.get();
+
+            s2 = internalCache(2).primaryKeySet().size();
+            s3 = internalCache(3).primaryKeySet().size();
+
+            assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + 
", s3=" + s3 + ']';
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedIsolated() throws Exception {
+        mode = PARTITIONED;
+
+        checkIsolatedDataStreamer();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedIsolated() throws Exception {
+        mode = REPLICATED;
+
+        checkIsolatedDataStreamer();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkIsolatedDataStreamer() throws Exception {
+        try {
+            useCache = true;
+
+            Ignite g1 = startGrid(0);
+            startGrid(1);
+            startGrid(2);
+
+            awaitPartitionMapExchange();
+
+            IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
+
+            for (int i = 0; i < 100; i++)
+                cache.put(i, -1);
+
+            final int cnt = 40_000;
+            final int threads = 10;
+
+            try (final IgniteDataStreamer<Integer, Integer> ldr = 
g1.dataStreamer(null)) {
+                final AtomicInteger idxGen = new AtomicInteger();
+
+                IgniteInternalFuture<?> f1 = multithreadedAsync(new 
Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        for (int i = 0; i < cnt; i++) {
+                            int idx = idxGen.getAndIncrement();
+
+                            ldr.addData(idx, idx);
+                        }
+
+                        return null;
+                    }
+                }, threads);
+
+                f1.get();
+            }
+
+            for (int g = 0; g < 3; g++) {
+                ClusterNode locNode = grid(g).localNode();
+
+                GridCacheAdapter<Integer, Integer> cache0 = 
((IgniteKernal)grid(g)).internalCache(null);
+
+                if (cache0.isNear())
+                    cache0 = ((GridNearCacheAdapter<Integer, 
Integer>)cache0).dht();
+
+                CacheAffinity<Integer> aff = cache0.affinity();
+
+                for (int key = 0; key < cnt * threads; key++) {
+                    if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, 
key)) {
+                        GridCacheEntryEx entry = cache0.peekEx(key);
+
+                        assertNotNull("Missing entry for key: " + key, entry);
+                        assertEquals((key < 100 ? -1 : key),
+                            CU.value(entry.rawGetOrUnmarshal(false), 
cache0.context(), false));
+                    }
+                }
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Test primitive arrays can be passed into data streamer.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPrimitiveArrays() throws Exception {
+        try {
+            useCache = true;
+            mode = PARTITIONED;
+
+            Ignite g1 = startGrid(1);
+            startGrid(2); // Reproduced only for several nodes in topology (if 
marshalling is used).
+
+            List<Object> arrays = Arrays.<Object>asList(
+                new byte[] {1}, new boolean[] {true, false}, new char[] {2, 
3}, new short[] {3, 4},
+                new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new 
double[] {7, 8});
+
+            IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(null);
+
+            for (int i = 0, size = arrays.size(); i < 1000; i++) {
+                Object arr = arrays.get(i % size);
+
+                dataLdr.addData(i, arr);
+                dataLdr.addData(i, fixedClosure(arr));
+            }
+
+            dataLdr.close(false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedMultiThreaded() throws Exception {
+        mode = REPLICATED;
+
+        checkLoaderMultithreaded(1, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedMultiThreaded() throws Exception {
+        mode = PARTITIONED;
+
+        checkLoaderMultithreaded(1, 3);
+    }
+
+    /**
+     * Tests loader in multithreaded environment with various count of grids 
started.
+     *
+     * @param nodesCntNoCache How many nodes should be started without cache.
+     * @param nodesCntCache How many nodes should be started with cache.
+     * @throws Exception If failed.
+     */
+    protected void checkLoaderMultithreaded(int nodesCntNoCache, int 
nodesCntCache)
+        throws Exception {
+        try {
+            // Start all required nodes.
+            int idx = 1;
+
+            for (int i = 0; i < nodesCntNoCache; i++)
+                startGrid(idx++);
+
+            useCache = true;
+
+            for (int i = 0; i < nodesCntCache; i++)
+                startGrid(idx++);
+
+            Ignite g1 = grid(1);
+
+            // Get and configure loader.
+            final IgniteDataStreamer<Integer, Integer> ldr = 
g1.dataStreamer(null);
+
+            ldr.updater(DataStreamerCacheUpdaters.<Integer, 
Integer>individual());
+            ldr.perNodeBufferSize(2);
+
+            // Define count of puts.
+            final AtomicInteger idxGen = new AtomicInteger();
+
+            final AtomicBoolean done = new AtomicBoolean();
+
+            try {
+                final int totalPutCnt = 50000;
+
+                IgniteInternalFuture<?> fut1 = multithreadedAsync(new 
Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        Collection<IgniteFuture<?>> futs = new ArrayList<>();
+
+                        while (!done.get()) {
+                            int idx = idxGen.getAndIncrement();
+
+                            if (idx >= totalPutCnt) {
+                                info(">>> Stopping producer thread since 
maximum count of puts reached.");
+
+                                break;
+                            }
+
+                            futs.add(ldr.addData(idx, idx));
+                        }
+
+                        ldr.flush();
+
+                        for (IgniteFuture<?> fut : futs)
+                            fut.get();
+
+                        return null;
+                    }
+                }, 5, "producer");
+
+                IgniteInternalFuture<?> fut2 = multithreadedAsync(new 
Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        while (!done.get()) {
+                            ldr.flush();
+
+                            U.sleep(100);
+                        }
+
+                        return null;
+                    }
+                }, 1, "flusher");
+
+                // Define index of node being restarted.
+                final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1;
+
+                IgniteInternalFuture<?> fut3 = multithreadedAsync(new 
Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        try {
+                            for (int i = 0; i < 5; i++) {
+                                Ignite g = startGrid(restartNodeIdx);
+
+                                UUID id = g.cluster().localNode().id();
+
+                                info(">>>>>>> Started node: " + id);
+
+                                U.sleep(1000);
+
+                                stopGrid(getTestGridName(restartNodeIdx), 
true);
+
+                                info(">>>>>>> Stopped node: " + id);
+                            }
+                        }
+                        finally {
+                            done.set(true);
+
+                            info("Start stop thread finished.");
+                        }
+
+                        return null;
+                    }
+                }, 1, "start-stop-thread");
+
+                fut1.get();
+                fut2.get();
+                fut3.get();
+            }
+            finally {
+                ldr.close(false);
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoaderApi() throws Exception {
+        useCache = true;
+
+        try {
+            Ignite g1 = startGrid(1);
+
+            IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null);
+
+            ldr.close(false);
+
+            try {
+                ldr.addData(0, 0);
+
+                assert false;
+            }
+            catch (IllegalStateException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            assert ldr.future().isDone();
+
+            ldr.future().get();
+
+            try {
+                // Create another loader.
+                ldr = g1.dataStreamer("UNKNOWN_CACHE");
+
+                assert false;
+            }
+            catch (IllegalStateException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            ldr.close(true);
+
+            assert ldr.future().isDone();
+
+            ldr.future().get();
+
+            // Create another loader.
+            ldr = g1.dataStreamer(null);
+
+            // Cancel with future.
+            ldr.future().cancel();
+
+            try {
+                ldr.addData(0, 0);
+
+                assert false;
+            }
+            catch (IllegalStateException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            assert ldr.future().isDone();
+
+            try {
+                ldr.future().get();
+
+                assert false;
+            }
+            catch (IgniteFutureCancelledException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            // Create another loader.
+            ldr = g1.dataStreamer(null);
+
+            // This will close loader.
+            stopGrid(getTestGridName(1), false);
+
+            try {
+                ldr.addData(0, 0);
+
+                assert false;
+            }
+            catch (IllegalStateException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            assert ldr.future().isDone();
+
+            ldr.future().get();
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Wraps integer to closure returning it.
+     *
+     * @param i Value to wrap.
+     * @return Callable.
+     */
+    private static Callable<Integer> callable(@Nullable final Integer i) {
+        return new Callable<Integer>() {
+            @Override public Integer call() throws Exception {
+                return i;
+            }
+        };
+    }
+
+    /**
+     * Wraps integer to closure returning it.
+     *
+     * @param i Value to wrap.
+     * @return Closure.
+     */
+    private static IgniteClosure<Integer, Integer> closure(@Nullable final 
Integer i) {
+        return new IgniteClosure<Integer, Integer>() {
+            @Override public Integer apply(Integer e) {
+                return e == null ? i : e + i;
+            }
+        };
+    }
+
+    /**
+     * Wraps object to closure returning it.
+     *
+     * @param obj Value to wrap.
+     * @return Closure.
+     */
+    private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) 
{
+        return new IgniteClosure<T, T>() {
+            @Override public T apply(T e) {
+                assert e == null || obj == null || e.getClass() == 
obj.getClass() :
+                    "Expects the same types [e=" + e + ", obj=" + obj + ']';
+
+                return obj;
+            }
+        };
+    }
+
+    /**
+     * Wraps integer to closure expecting it and returning {@code null}.
+     *
+     * @param exp Expected closure value.
+     * @return Remove expected cache value closure.
+     */
+    private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T 
exp) {
+        return new IgniteClosure<T, T>() {
+            @Override public T apply(T act) {
+                if (exp == null ? act == null : exp.equals(act))
+                    return null;
+
+                throw new AssertionError("Unexpected value [exp=" + exp + ", 
act=" + act + ']');
+            }
+        };
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFlush() throws Exception {
+        mode = LOCAL;
+
+        useCache = true;
+
+        try {
+            Ignite g = startGrid();
+
+            final IgniteCache<Integer, Integer> c = g.jcache(null);
+
+            final IgniteDataStreamer<Integer, Integer> ldr = 
g.dataStreamer(null);
+
+            ldr.perNodeBufferSize(10);
+
+            for (int i = 0; i < 9; i++)
+                ldr.addData(i, i);
+
+            assertTrue(c.localSize() == 0);
+
+            multithreaded(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    ldr.flush();
+
+                    assertEquals(9, c.size());
+
+                    return null;
+                }
+            }, 5, "flush-checker");
+
+            ldr.addData(100, 100);
+
+            ldr.flush();
+
+            assertEquals(10, c.size());
+
+            ldr.addData(200, 200);
+
+            ldr.close(false);
+
+            ldr.future().get();
+
+            assertEquals(11, c.size());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTryFlush() throws Exception {
+        mode = LOCAL;
+
+        useCache = true;
+
+        try {
+            Ignite g = startGrid();
+
+            IgniteCache<Integer, Integer> c = g.jcache(null);
+
+            IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
+
+            ldr.perNodeBufferSize(10);
+
+            for (int i = 0; i < 9; i++)
+                ldr.addData(i, i);
+
+            assertTrue(c.localSize() == 0);
+
+            ldr.tryFlush();
+
+            Thread.sleep(100);
+
+            assertEquals(9, c.size());
+
+            ldr.close(false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFlushTimeout() throws Exception {
+        mode = LOCAL;
+
+        useCache = true;
+
+        try {
+            Ignite g = startGrid();
+
+            final CountDownLatch latch = new CountDownLatch(9);
+
+            g.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    latch.countDown();
+
+                    return true;
+                }
+            }, EVT_CACHE_OBJECT_PUT);
+
+            IgniteCache<Integer, Integer> c = g.jcache(null);
+
+            assertTrue(c.localSize() == 0);
+
+            IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
+
+            ldr.perNodeBufferSize(10);
+            ldr.autoFlushFrequency(3000);
+            ldr.allowOverwrite(true);
+
+            for (int i = 0; i < 9; i++)
+                ldr.addData(i, i);
+
+            assertTrue(c.localSize() == 0);
+
+            assertFalse(latch.await(1000, MILLISECONDS));
+
+            assertTrue(c.localSize() == 0);
+
+            assertTrue(latch.await(3000, MILLISECONDS));
+
+            assertEquals(9, c.size());
+
+            ldr.close(false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateStore() throws Exception {
+        storeMap = new ConcurrentHashMap<>();
+
+        try {
+            store = new TestStore();
+
+            useCache = true;
+
+            Ignite ignite = startGrid(1);
+
+            startGrid(2);
+            startGrid(3);
+
+            for (int i = 0; i < 1000; i++)
+                storeMap.put(i, i);
+
+            try (IgniteDataStreamer<Object, Object> ldr = 
ignite.dataStreamer(null)) {
+                ldr.allowOverwrite(true);
+
+                assertFalse(ldr.skipStore());
+
+                for (int i = 0; i < 1000; i++)
+                    ldr.removeData(i);
+
+                for (int i = 1000; i < 2000; i++)
+                    ldr.addData(i, i);
+            }
+
+            for (int i = 0; i < 1000; i++)
+                assertNull(storeMap.get(i));
+
+            for (int i = 1000; i < 2000; i++)
+                assertEquals(i, storeMap.get(i));
+
+            try (IgniteDataStreamer<Object, Object> ldr = 
ignite.dataStreamer(null)) {
+                ldr.allowOverwrite(true);
+
+                ldr.skipStore(true);
+
+                for (int i = 0; i < 1000; i++)
+                    ldr.addData(i, i);
+
+                for (int i = 1000; i < 2000; i++)
+                    ldr.removeData(i);
+            }
+
+            IgniteCache<Object, Object> cache = ignite.jcache(null);
+
+            for (int i = 0; i < 1000; i++) {
+                assertNull(storeMap.get(i));
+
+                assertEquals(i, cache.get(i));
+            }
+
+            for (int i = 1000; i < 2000; i++) {
+                assertEquals(i, storeMap.get(i));
+
+                assertNull(cache.localPeek(i, CachePeekMode.ONHEAP));
+            }
+        }
+        finally {
+            storeMap = null;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomUserUpdater() throws Exception {
+        useCache = true;
+
+        try {
+            Ignite ignite = startGrid(1);
+
+            startGrid(2);
+            startGrid(3);
+
+            try (IgniteDataStreamer<String, TestObject> ldr = 
ignite.dataStreamer(null)) {
+                ldr.allowOverwrite(true);
+
+                ldr.updater(new IgniteDataStreamer.Updater<String, 
TestObject>() {
+                    @Override public void update(IgniteCache<String, 
TestObject> cache,
+                        Collection<Map.Entry<String, TestObject>> entries) {
+                        for (Map.Entry<String, TestObject> e : entries) {
+                            assertTrue(e.getKey() instanceof String);
+                            assertTrue(e.getValue() instanceof TestObject);
+
+                            cache.put(e.getKey(), new 
TestObject(e.getValue().val + 1));
+                        }
+                    }
+                });
+
+                for (int i = 0; i < 100; i++)
+                    ldr.addData(String.valueOf(i), new TestObject(i));
+            }
+
+            IgniteCache<String, TestObject> cache = ignite.jcache(null);
+
+            for (int i = 0; i < 100; i++) {
+                TestObject val = cache.get(String.valueOf(i));
+
+                assertNotNull(val);
+                assertEquals(i + 1, val.val);
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestObject {
+        /** Value. */
+        private final int val;
+
+        /**
+         * @param val Value.
+         */
+        private TestObject(int val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestObject obj = (TestObject)o;
+
+            return val == obj.val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return val;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestStore extends CacheStoreAdapter<Object, Object> {
+        /** {@inheritDoc} */
+        @Nullable @Override public Object load(Object key) {
+            return storeMap.get(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<?, ?> entry) {
+            storeMap.put(entry.getKey(), entry.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) {
+            storeMap.remove(key);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java
new file mode 100644
index 0000000..3d100e1
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jdk8.backport.*;
+
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Data streamer performance test. Compares group lock data streamer to 
traditional lock.
+ * <p>
+ * Disable assertions and give at least 2 GB heap to run this test.
+ */
+public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int GRID_CNT = 3;
+
+    /** */
+    private static final int ENTRY_CNT = 80000;
+
+    /** */
+    private boolean useCache;
+
+    /** */
+    private String[] vals = new String[2048];
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(spi);
+
+        cfg.setIncludeProperties();
+
+        cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, 
EVT_JOB_MAPPED);
+
+        cfg.setConnectorConfiguration(null);
+
+        cfg.setPeerClassLoadingEnabled(true);
+
+        if (useCache) {
+            CacheConfiguration cc = defaultCacheConfiguration();
+
+            cc.setCacheMode(PARTITIONED);
+
+            cc.setDistributionMode(PARTITIONED_ONLY);
+            cc.setWriteSynchronizationMode(FULL_SYNC);
+            cc.setStartSize(ENTRY_CNT / GRID_CNT);
+            cc.setSwapEnabled(false);
+
+            cc.setBackups(1);
+
+            cfg.setCacheSanityCheckEnabled(false);
+            cfg.setCacheConfiguration(cc);
+        }
+        else
+            cfg.setCacheConfiguration();
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        for (int i = 0; i < vals.length; i++) {
+            int valLen = ThreadLocalRandom8.current().nextInt(128, 512);
+
+            StringBuilder sb = new StringBuilder();
+
+            for (int j = 0; j < valLen; j++)
+                sb.append('a' + ThreadLocalRandom8.current().nextInt(20));
+
+            vals[i] = sb.toString();
+
+            info("Value: " + vals[i]);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPerformance() throws Exception {
+        doTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTest() throws Exception {
+        System.gc();
+        System.gc();
+        System.gc();
+
+        try {
+            useCache = true;
+
+            startGridsMultiThreaded(GRID_CNT);
+
+            useCache = false;
+
+            Ignite ignite = startGrid();
+
+            final IgniteDataStreamer<Integer, String> ldr = 
ignite.dataStreamer(null);
+
+            ldr.perNodeBufferSize(8192);
+            ldr.updater(DataStreamerCacheUpdaters.<Integer, 
String>batchedSorted());
+            ldr.autoFlushFrequency(0);
+
+            final LongAdder cnt = new LongAdder();
+
+            long start = U.currentTimeMillis();
+
+            Thread t = new Thread(new Runnable() {
+                @SuppressWarnings("BusyWait")
+                @Override public void run() {
+                    while (true) {
+                        try {
+                            Thread.sleep(10000);
+                        }
+                        catch (InterruptedException ignored) {
+                            break;
+                        }
+
+                        info(">>> Adds/sec: " + cnt.sumThenReset() / 10);
+                    }
+                }
+            });
+
+            t.setDaemon(true);
+
+            t.start();
+
+            int threadNum = 2;//Runtime.getRuntime().availableProcessors();
+
+            multithreaded(new Callable<Object>() {
+                @SuppressWarnings("InfiniteLoopStatement")
+                @Override public Object call() throws Exception {
+                    ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
+
+                    while (true) {
+                        int i = rnd.nextInt(ENTRY_CNT);
+
+                        ldr.addData(i, vals[rnd.nextInt(vals.length)]);
+
+                        cnt.increment();
+                    }
+                }
+            }, threadNum, "loader");
+
+            info("Closing loader...");
+
+            ldr.close(false);
+
+            long duration = U.currentTimeMillis() - start;
+
+            info("Finished performance test. Duration: " + duration + "ms.");
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 96e041c..f34f101 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -32,7 +32,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.replicated.*;
 import 
org.apache.ignite.internal.processors.cache.distributed.replicated.preloader.*;
 import org.apache.ignite.internal.processors.cache.integration.*;
 import org.apache.ignite.internal.processors.cache.local.*;
-import org.apache.ignite.internal.processors.datastream.*;
+import org.apache.ignite.internal.processors.datastreamer.*;
 
 /**
  * Test suite.

Reply via email to