http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index 0000000,e6da0d1..1a386cb
mode 000000,100644..100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@@ -1,0 -1,970 +1,974 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.datastreamer;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.cache.affinity.*;
+ import org.apache.ignite.cache.store.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.near.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.optimized.*;
+ import org.apache.ignite.spi.discovery.tcp.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+ import org.apache.ignite.testframework.junits.common.*;
+ import org.jetbrains.annotations.*;
+ 
+ import javax.cache.*;
+ import javax.cache.configuration.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static java.util.concurrent.TimeUnit.*;
+ import static org.apache.ignite.cache.CacheAtomicityMode.*;
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
+ import static org.apache.ignite.cache.CacheMode.*;
+ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+ import static org.apache.ignite.events.EventType.*;
+ 
+ /**
+  *
+  */
+ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
+     /** */
+     private static ConcurrentHashMap<Object, Object> storeMap;
+ 
+     /** */
+     private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+ 
+     /** */
+     private CacheMode mode = PARTITIONED;
+ 
+     /** */
+     private boolean nearEnabled = true;
+ 
+     /** */
+     private boolean useCache;
+ 
+     /** */
+     private TestStore store;
+ 
+     /** {@inheritDoc} */
+     @Override public void afterTest() throws Exception {
+         super.afterTest();
+ 
+         useCache = false;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings({"IfMayBeConditional", "unchecked"})
+     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+         IgniteConfiguration cfg = super.getConfiguration(gridName);
+ 
+         TcpDiscoverySpi spi = new TcpDiscoverySpi();
+ 
+         spi.setIpFinder(ipFinder);
+ 
+         cfg.setDiscoverySpi(spi);
+ 
+         cfg.setIncludeProperties();
+ 
+         cfg.setMarshaller(new OptimizedMarshaller(false));
+ 
+         if (useCache) {
+             CacheConfiguration cc = defaultCacheConfiguration();
+ 
+             cc.setCacheMode(mode);
+             cc.setAtomicityMode(TRANSACTIONAL);
 -            cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : 
PARTITIONED_ONLY);
++
++            if (nearEnabled) {
++                NearCacheConfiguration nearCfg = new NearCacheConfiguration();
++
++                cc.setNearConfiguration(nearCfg);
++            }
++
+             cc.setWriteSynchronizationMode(FULL_SYNC);
+ 
+             cc.setEvictSynchronized(false);
 -            cc.setEvictNearSynchronized(false);
+ 
+             if (store != null) {
+                 cc.setCacheStoreFactory(new 
FactoryBuilder.SingletonFactory(store));
+                 cc.setReadThrough(true);
+                 cc.setWriteThrough(true);
+             }
+ 
+             cfg.setCacheConfiguration(cc);
+         }
+         else
+             cfg.setCacheConfiguration();
+ 
+         return cfg;
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testPartitioned() throws Exception {
+         mode = PARTITIONED;
+ 
+         checkDataStreamer();
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testColocated() throws Exception {
+         mode = PARTITIONED;
+         nearEnabled = false;
+ 
+         checkDataStreamer();
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testReplicated() throws Exception {
+         mode = REPLICATED;
+ 
+         checkDataStreamer();
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testLocal() throws Exception {
+         mode = LOCAL;
+ 
+         try {
+             checkDataStreamer();
+ 
+             assert false;
+         }
+         catch (IgniteCheckedException e) {
+             // Cannot load local cache configured remotely.
+             info("Caught expected exception: " + e);
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     @SuppressWarnings("ErrorNotRethrown")
+     private void checkDataStreamer() throws Exception {
+         try {
+             Ignite g1 = startGrid(1);
+ 
+             useCache = true;
+ 
+             Ignite g2 = startGrid(2);
+             startGrid(3);
+ 
+             final IgniteDataStreamer<Integer, Integer> ldr = 
g1.dataStreamer(null);
+ 
+             ldr.updater(DataStreamerCacheUpdaters.<Integer, 
Integer>batchedSorted());
+ 
+             final AtomicInteger idxGen = new AtomicInteger();
+             final int cnt = 400;
+             final int threads = 10;
+ 
+             final CountDownLatch l1 = new CountDownLatch(threads);
+ 
+             IgniteInternalFuture<?> f1 = multithreadedAsync(new 
Callable<Object>() {
+                 @Override public Object call() throws Exception {
+                     Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
+ 
+                     for (int i = 0; i < cnt; i++) {
+                         int idx = idxGen.getAndIncrement();
+ 
+                         futs.add(ldr.addData(idx, idx));
+                     }
+ 
+                     l1.countDown();
+ 
+                     for (IgniteFuture<?> fut : futs)
+                         fut.get();
+ 
+                     return null;
+                 }
+             }, threads);
+ 
+             l1.await();
+ 
+             // This will wait until data streamer finishes loading.
+             stopGrid(getTestGridName(1), false);
+ 
+             f1.get();
+ 
+             int s2 = internalCache(2).primaryKeySet().size();
+             int s3 = internalCache(3).primaryKeySet().size();
+             int total = threads * cnt;
+ 
+             assertEquals(total, s2 + s3);
+ 
+             final IgniteDataStreamer<Integer, Integer> rmvLdr = 
g2.dataStreamer(null);
+ 
+             rmvLdr.updater(DataStreamerCacheUpdaters.<Integer, 
Integer>batchedSorted());
+ 
+             final CountDownLatch l2 = new CountDownLatch(threads);
+ 
+             IgniteInternalFuture<?> f2 = multithreadedAsync(new 
Callable<Object>() {
+                 @Override public Object call() throws Exception {
+                     Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
+ 
+                     for (int i = 0; i < cnt; i++) {
+                         final int key = idxGen.decrementAndGet();
+ 
+                         futs.add(rmvLdr.removeData(key));
+                     }
+ 
+                     l2.countDown();
+ 
+                     for (IgniteFuture<?> fut : futs)
+                         fut.get();
+ 
+                     return null;
+                 }
+             }, threads);
+ 
+             l2.await();
+ 
+             rmvLdr.close(false);
+ 
+             f2.get();
+ 
+             s2 = internalCache(2).primaryKeySet().size();
+             s3 = internalCache(3).primaryKeySet().size();
+ 
+             assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + 
", s3=" + s3 + ']';
+         }
+         finally {
+             stopAllGrids();
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testPartitionedIsolated() throws Exception {
+         mode = PARTITIONED;
+ 
+         checkIsolatedDataStreamer();
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testReplicatedIsolated() throws Exception {
+         mode = REPLICATED;
+ 
+         checkIsolatedDataStreamer();
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     private void checkIsolatedDataStreamer() throws Exception {
+         try {
+             useCache = true;
+ 
+             Ignite g1 = startGrid(0);
+             startGrid(1);
+             startGrid(2);
+ 
+             awaitPartitionMapExchange();
+ 
+             IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
+ 
+             for (int i = 0; i < 100; i++)
+                 cache.put(i, -1);
+ 
+             final int cnt = 40_000;
+             final int threads = 10;
+ 
+             try (final IgniteDataStreamer<Integer, Integer> ldr = 
g1.dataStreamer(null)) {
+                 final AtomicInteger idxGen = new AtomicInteger();
+ 
+                 IgniteInternalFuture<?> f1 = multithreadedAsync(new 
Callable<Object>() {
+                     @Override public Object call() throws Exception {
+                         for (int i = 0; i < cnt; i++) {
+                             int idx = idxGen.getAndIncrement();
+ 
+                             ldr.addData(idx, idx);
+                         }
+ 
+                         return null;
+                     }
+                 }, threads);
+ 
+                 f1.get();
+             }
+ 
+             for (int g = 0; g < 3; g++) {
+                 ClusterNode locNode = grid(g).localNode();
+ 
+                 GridCacheAdapter<Integer, Integer> cache0 = 
((IgniteKernal)grid(g)).internalCache(null);
+ 
+                 if (cache0.isNear())
+                     cache0 = ((GridNearCacheAdapter<Integer, 
Integer>)cache0).dht();
+ 
+                 CacheAffinity<Integer> aff = cache0.affinity();
+ 
+                 for (int key = 0; key < cnt * threads; key++) {
+                     if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, 
key)) {
+                         GridCacheEntryEx entry = cache0.peekEx(key);
+ 
+                         assertNotNull("Missing entry for key: " + key, entry);
+                         assertEquals((key < 100 ? -1 : key),
+                             CU.value(entry.rawGetOrUnmarshal(false), 
cache0.context(), false));
+                     }
+                 }
+             }
+         }
+         finally {
+             stopAllGrids();
+         }
+     }
+ 
+     /**
+      * Test primitive arrays can be passed into data streamer.
+      *
+      * @throws Exception If failed.
+      */
+     public void testPrimitiveArrays() throws Exception {
+         try {
+             useCache = true;
+             mode = PARTITIONED;
+ 
+             Ignite g1 = startGrid(1);
+             startGrid(2); // Reproduced only for several nodes in topology 
(if marshalling is used).
+ 
+             List<Object> arrays = Arrays.<Object>asList(
+                 new byte[] {1}, new boolean[] {true, false}, new char[] {2, 
3}, new short[] {3, 4},
+                 new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new 
double[] {7, 8});
+ 
+             IgniteDataStreamer<Object, Object> dataLdr = 
g1.dataStreamer(null);
+ 
+             for (int i = 0, size = arrays.size(); i < 1000; i++) {
+                 Object arr = arrays.get(i % size);
+ 
+                 dataLdr.addData(i, arr);
+                 dataLdr.addData(i, fixedClosure(arr));
+             }
+ 
+             dataLdr.close(false);
+         }
+         finally {
+             stopAllGrids();
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testReplicatedMultiThreaded() throws Exception {
+         mode = REPLICATED;
+ 
+         checkLoaderMultithreaded(1, 2);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testPartitionedMultiThreaded() throws Exception {
+         mode = PARTITIONED;
+ 
+         checkLoaderMultithreaded(1, 3);
+     }
+ 
+     /**
+      * Tests loader in multithreaded environment with various count of grids 
started.
+      *
+      * @param nodesCntNoCache How many nodes should be started without cache.
+      * @param nodesCntCache How many nodes should be started with cache.
+      * @throws Exception If failed.
+      */
+     protected void checkLoaderMultithreaded(int nodesCntNoCache, int 
nodesCntCache)
+         throws Exception {
+         try {
+             // Start all required nodes.
+             int idx = 1;
+ 
+             for (int i = 0; i < nodesCntNoCache; i++)
+                 startGrid(idx++);
+ 
+             useCache = true;
+ 
+             for (int i = 0; i < nodesCntCache; i++)
+                 startGrid(idx++);
+ 
+             Ignite g1 = grid(1);
+ 
+             // Get and configure loader.
+             final IgniteDataStreamer<Integer, Integer> ldr = 
g1.dataStreamer(null);
+ 
+             ldr.updater(DataStreamerCacheUpdaters.<Integer, 
Integer>individual());
+             ldr.perNodeBufferSize(2);
+ 
+             // Define count of puts.
+             final AtomicInteger idxGen = new AtomicInteger();
+ 
+             final AtomicBoolean done = new AtomicBoolean();
+ 
+             try {
+                 final int totalPutCnt = 50000;
+ 
+                 IgniteInternalFuture<?> fut1 = multithreadedAsync(new 
Callable<Object>() {
+                     @Override public Object call() throws Exception {
+                         Collection<IgniteFuture<?>> futs = new ArrayList<>();
+ 
+                         while (!done.get()) {
+                             int idx = idxGen.getAndIncrement();
+ 
+                             if (idx >= totalPutCnt) {
+                                 info(">>> Stopping producer thread since 
maximum count of puts reached.");
+ 
+                                 break;
+                             }
+ 
+                             futs.add(ldr.addData(idx, idx));
+                         }
+ 
+                         ldr.flush();
+ 
+                         for (IgniteFuture<?> fut : futs)
+                             fut.get();
+ 
+                         return null;
+                     }
+                 }, 5, "producer");
+ 
+                 IgniteInternalFuture<?> fut2 = multithreadedAsync(new 
Callable<Object>() {
+                     @Override public Object call() throws Exception {
+                         while (!done.get()) {
+                             ldr.flush();
+ 
+                             U.sleep(100);
+                         }
+ 
+                         return null;
+                     }
+                 }, 1, "flusher");
+ 
+                 // Define index of node being restarted.
+                 final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 
1;
+ 
+                 IgniteInternalFuture<?> fut3 = multithreadedAsync(new 
Callable<Object>() {
+                     @Override public Object call() throws Exception {
+                         try {
+                             for (int i = 0; i < 5; i++) {
+                                 Ignite g = startGrid(restartNodeIdx);
+ 
+                                 UUID id = g.cluster().localNode().id();
+ 
+                                 info(">>>>>>> Started node: " + id);
+ 
+                                 U.sleep(1000);
+ 
+                                 stopGrid(getTestGridName(restartNodeIdx), 
true);
+ 
+                                 info(">>>>>>> Stopped node: " + id);
+                             }
+                         }
+                         finally {
+                             done.set(true);
+ 
+                             info("Start stop thread finished.");
+                         }
+ 
+                         return null;
+                     }
+                 }, 1, "start-stop-thread");
+ 
+                 fut1.get();
+                 fut2.get();
+                 fut3.get();
+             }
+             finally {
+                 ldr.close(false);
+             }
+         }
+         finally {
+             stopAllGrids();
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testLoaderApi() throws Exception {
+         useCache = true;
+ 
+         try {
+             Ignite g1 = startGrid(1);
+ 
+             IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null);
+ 
+             ldr.close(false);
+ 
+             try {
+                 ldr.addData(0, 0);
+ 
+                 assert false;
+             }
+             catch (IllegalStateException e) {
+                 info("Caught expected exception: " + e);
+             }
+ 
+             assert ldr.future().isDone();
+ 
+             ldr.future().get();
+ 
+             try {
+                 // Create another loader.
+                 ldr = g1.dataStreamer("UNKNOWN_CACHE");
+ 
+                 assert false;
+             }
+             catch (IllegalStateException e) {
+                 info("Caught expected exception: " + e);
+             }
+ 
+             ldr.close(true);
+ 
+             assert ldr.future().isDone();
+ 
+             ldr.future().get();
+ 
+             // Create another loader.
+             ldr = g1.dataStreamer(null);
+ 
+             // Cancel with future.
+             ldr.future().cancel();
+ 
+             try {
+                 ldr.addData(0, 0);
+ 
+                 assert false;
+             }
+             catch (IllegalStateException e) {
+                 info("Caught expected exception: " + e);
+             }
+ 
+             assert ldr.future().isDone();
+ 
+             try {
+                 ldr.future().get();
+ 
+                 assert false;
+             }
+             catch (IgniteFutureCancelledException e) {
+                 info("Caught expected exception: " + e);
+             }
+ 
+             // Create another loader.
+             ldr = g1.dataStreamer(null);
+ 
+             // This will close loader.
+             stopGrid(getTestGridName(1), false);
+ 
+             try {
+                 ldr.addData(0, 0);
+ 
+                 assert false;
+             }
+             catch (IllegalStateException e) {
+                 info("Caught expected exception: " + e);
+             }
+ 
+             assert ldr.future().isDone();
+ 
+             ldr.future().get();
+         }
+         finally {
+             stopAllGrids();
+         }
+     }
+ 
+     /**
+      * Wraps integer to closure returning it.
+      *
+      * @param i Value to wrap.
+      * @return Callable.
+      */
+     private static Callable<Integer> callable(@Nullable final Integer i) {
+         return new Callable<Integer>() {
+             @Override public Integer call() throws Exception {
+                 return i;
+             }
+         };
+     }
+ 
+     /**
+      * Wraps integer to closure returning it.
+      *
+      * @param i Value to wrap.
+      * @return Closure.
+      */
+     private static IgniteClosure<Integer, Integer> closure(@Nullable final 
Integer i) {
+         return new IgniteClosure<Integer, Integer>() {
+             @Override public Integer apply(Integer e) {
+                 return e == null ? i : e + i;
+             }
+         };
+     }
+ 
+     /**
+      * Wraps object to closure returning it.
+      *
+      * @param obj Value to wrap.
+      * @return Closure.
+      */
+     private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T 
obj) {
+         return new IgniteClosure<T, T>() {
+             @Override public T apply(T e) {
+                 assert e == null || obj == null || e.getClass() == 
obj.getClass() :
+                     "Expects the same types [e=" + e + ", obj=" + obj + ']';
+ 
+                 return obj;
+             }
+         };
+     }
+ 
+     /**
+      * Wraps integer to closure expecting it and returning {@code null}.
+      *
+      * @param exp Expected closure value.
+      * @return Remove expected cache value closure.
+      */
+     private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T 
exp) {
+         return new IgniteClosure<T, T>() {
+             @Override public T apply(T act) {
+                 if (exp == null ? act == null : exp.equals(act))
+                     return null;
+ 
+                 throw new AssertionError("Unexpected value [exp=" + exp + ", 
act=" + act + ']');
+             }
+         };
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testFlush() throws Exception {
+         mode = LOCAL;
+ 
+         useCache = true;
+ 
+         try {
+             Ignite g = startGrid();
+ 
+             final IgniteCache<Integer, Integer> c = g.jcache(null);
+ 
+             final IgniteDataStreamer<Integer, Integer> ldr = 
g.dataStreamer(null);
+ 
+             ldr.perNodeBufferSize(10);
+ 
+             for (int i = 0; i < 9; i++)
+                 ldr.addData(i, i);
+ 
+             assertTrue(c.localSize() == 0);
+ 
+             multithreaded(new Callable<Void>() {
+                 @Override
+                 public Void call() throws Exception {
+                     ldr.flush();
+ 
+                     assertEquals(9, c.size());
+ 
+                     return null;
+                 }
+             }, 5, "flush-checker");
+ 
+             ldr.addData(100, 100);
+ 
+             ldr.flush();
+ 
+             assertEquals(10, c.size());
+ 
+             ldr.addData(200, 200);
+ 
+             ldr.close(false);
+ 
+             ldr.future().get();
+ 
+             assertEquals(11, c.size());
+         }
+         finally {
+             stopAllGrids();
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testTryFlush() throws Exception {
+         mode = LOCAL;
+ 
+         useCache = true;
+ 
+         try {
+             Ignite g = startGrid();
+ 
+             IgniteCache<Integer, Integer> c = g.jcache(null);
+ 
+             IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
+ 
+             ldr.perNodeBufferSize(10);
+ 
+             for (int i = 0; i < 9; i++)
+                 ldr.addData(i, i);
+ 
+             assertTrue(c.localSize() == 0);
+ 
+             ldr.tryFlush();
+ 
+             Thread.sleep(100);
+ 
+             assertEquals(9, c.size());
+ 
+             ldr.close(false);
+         }
+         finally {
+             stopAllGrids();
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testFlushTimeout() throws Exception {
+         mode = LOCAL;
+ 
+         useCache = true;
+ 
+         try {
+             Ignite g = startGrid();
+ 
+             final CountDownLatch latch = new CountDownLatch(9);
+ 
+             g.events().localListen(new IgnitePredicate<Event>() {
+                 @Override public boolean apply(Event evt) {
+                     latch.countDown();
+ 
+                     return true;
+                 }
+             }, EVT_CACHE_OBJECT_PUT);
+ 
+             IgniteCache<Integer, Integer> c = g.jcache(null);
+ 
+             assertTrue(c.localSize() == 0);
+ 
+             IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
+ 
+             ldr.perNodeBufferSize(10);
+             ldr.autoFlushFrequency(3000);
+             ldr.allowOverwrite(true);
+ 
+             for (int i = 0; i < 9; i++)
+                 ldr.addData(i, i);
+ 
+             assertTrue(c.localSize() == 0);
+ 
+             assertFalse(latch.await(1000, MILLISECONDS));
+ 
+             assertTrue(c.localSize() == 0);
+ 
+             assertTrue(latch.await(3000, MILLISECONDS));
+ 
+             assertEquals(9, c.size());
+ 
+             ldr.close(false);
+         }
+         finally {
+             stopAllGrids();
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testUpdateStore() throws Exception {
+         storeMap = new ConcurrentHashMap<>();
+ 
+         try {
+             store = new TestStore();
+ 
+             useCache = true;
+ 
+             Ignite ignite = startGrid(1);
+ 
+             startGrid(2);
+             startGrid(3);
+ 
+             for (int i = 0; i < 1000; i++)
+                 storeMap.put(i, i);
+ 
+             try (IgniteDataStreamer<Object, Object> ldr = 
ignite.dataStreamer(null)) {
+                 ldr.allowOverwrite(true);
+ 
+                 assertFalse(ldr.skipStore());
+ 
+                 for (int i = 0; i < 1000; i++)
+                     ldr.removeData(i);
+ 
+                 for (int i = 1000; i < 2000; i++)
+                     ldr.addData(i, i);
+             }
+ 
+             for (int i = 0; i < 1000; i++)
+                 assertNull(storeMap.get(i));
+ 
+             for (int i = 1000; i < 2000; i++)
+                 assertEquals(i, storeMap.get(i));
+ 
+             try (IgniteDataStreamer<Object, Object> ldr = 
ignite.dataStreamer(null)) {
+                 ldr.allowOverwrite(true);
+ 
+                 ldr.skipStore(true);
+ 
+                 for (int i = 0; i < 1000; i++)
+                     ldr.addData(i, i);
+ 
+                 for (int i = 1000; i < 2000; i++)
+                     ldr.removeData(i);
+             }
+ 
+             IgniteCache<Object, Object> cache = ignite.jcache(null);
+ 
+             for (int i = 0; i < 1000; i++) {
+                 assertNull(storeMap.get(i));
+ 
+                 assertEquals(i, cache.get(i));
+             }
+ 
+             for (int i = 1000; i < 2000; i++) {
+                 assertEquals(i, storeMap.get(i));
+ 
+                 assertNull(cache.localPeek(i, CachePeekMode.ONHEAP));
+             }
+         }
+         finally {
+             storeMap = null;
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testCustomUserUpdater() throws Exception {
+         useCache = true;
+ 
+         try {
+             Ignite ignite = startGrid(1);
+ 
+             startGrid(2);
+             startGrid(3);
+ 
+             try (IgniteDataStreamer<String, TestObject> ldr = 
ignite.dataStreamer(null)) {
+                 ldr.allowOverwrite(true);
+ 
+                 ldr.updater(new IgniteDataStreamer.Updater<String, 
TestObject>() {
+                     @Override public void update(IgniteCache<String, 
TestObject> cache,
+                         Collection<Map.Entry<String, TestObject>> entries) {
+                         for (Map.Entry<String, TestObject> e : entries) {
+                             assertTrue(e.getKey() instanceof String);
+                             assertTrue(e.getValue() instanceof TestObject);
+ 
+                             cache.put(e.getKey(), new 
TestObject(e.getValue().val + 1));
+                         }
+                     }
+                 });
+ 
+                 for (int i = 0; i < 100; i++)
+                     ldr.addData(String.valueOf(i), new TestObject(i));
+             }
+ 
+             IgniteCache<String, TestObject> cache = ignite.jcache(null);
+ 
+             for (int i = 0; i < 100; i++) {
+                 TestObject val = cache.get(String.valueOf(i));
+ 
+                 assertNotNull(val);
+                 assertEquals(i + 1, val.val);
+             }
+         }
+         finally {
+             stopAllGrids();
+         }
+     }
+ 
+     /**
+      *
+      */
+     private static class TestObject {
+         /** Value. */
+         private final int val;
+ 
+         /**
+          * @param val Value.
+          */
+         private TestObject(int val) {
+             this.val = val;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean equals(Object o) {
+             if (this == o)
+                 return true;
+ 
+             if (o == null || getClass() != o.getClass())
+                 return false;
+ 
+             TestObject obj = (TestObject)o;
+ 
+             return val == obj.val;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public int hashCode() {
+             return val;
+         }
+     }
+ 
+     /**
+      *
+      */
+     private static class TestStore extends CacheStoreAdapter<Object, Object> {
+         /** {@inheritDoc} */
+         @Nullable @Override public Object load(Object key) {
+             return storeMap.get(key);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void write(Cache.Entry<?, ?> entry) {
+             storeMap.put(entry.getKey(), entry.getValue());
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void delete(Object key) {
+             storeMap.remove(key);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java
index 0000000,3d100e1..bf5707e
mode 000000,100644..100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java
@@@ -1,0 -1,197 +1,196 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.datastreamer;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.spi.discovery.tcp.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+ import org.apache.ignite.testframework.junits.common.*;
+ import org.jdk8.backport.*;
+ 
+ import java.util.concurrent.*;
+ 
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
+ import static org.apache.ignite.cache.CacheMode.*;
+ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+ import static org.apache.ignite.events.EventType.*;
+ 
+ /**
+  * Data streamer performance test. Compares group lock data streamer to 
traditional lock.
+  * <p>
+  * Disable assertions and give at least 2 GB heap to run this test.
+  */
+ public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest 
{
+     /** */
+     private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+ 
+     /** */
+     private static final int GRID_CNT = 3;
+ 
+     /** */
+     private static final int ENTRY_CNT = 80000;
+ 
+     /** */
+     private boolean useCache;
+ 
+     /** */
+     private String[] vals = new String[2048];
+ 
+     /** {@inheritDoc} */
+     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+         IgniteConfiguration cfg = super.getConfiguration(gridName);
+ 
+         TcpDiscoverySpi spi = new TcpDiscoverySpi();
+ 
+         spi.setIpFinder(IP_FINDER);
+ 
+         cfg.setDiscoverySpi(spi);
+ 
+         cfg.setIncludeProperties();
+ 
+         cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, 
EVT_JOB_MAPPED);
+ 
+         cfg.setConnectorConfiguration(null);
+ 
+         cfg.setPeerClassLoadingEnabled(true);
+ 
+         if (useCache) {
+             CacheConfiguration cc = defaultCacheConfiguration();
+ 
+             cc.setCacheMode(PARTITIONED);
+ 
 -            cc.setDistributionMode(PARTITIONED_ONLY);
++            cc.setNearConfiguration(null);
+             cc.setWriteSynchronizationMode(FULL_SYNC);
+             cc.setStartSize(ENTRY_CNT / GRID_CNT);
+             cc.setSwapEnabled(false);
+ 
+             cc.setBackups(1);
+ 
+             cfg.setCacheSanityCheckEnabled(false);
+             cfg.setCacheConfiguration(cc);
+         }
+         else
+             cfg.setCacheConfiguration();
+ 
+         return cfg;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void beforeTestsStarted() throws Exception {
+         super.beforeTestsStarted();
+ 
+         for (int i = 0; i < vals.length; i++) {
+             int valLen = ThreadLocalRandom8.current().nextInt(128, 512);
+ 
+             StringBuilder sb = new StringBuilder();
+ 
+             for (int j = 0; j < valLen; j++)
+                 sb.append('a' + ThreadLocalRandom8.current().nextInt(20));
+ 
+             vals[i] = sb.toString();
+ 
+             info("Value: " + vals[i]);
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testPerformance() throws Exception {
+         doTest();
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     private void doTest() throws Exception {
+         System.gc();
+         System.gc();
+         System.gc();
+ 
+         try {
+             useCache = true;
+ 
+             startGridsMultiThreaded(GRID_CNT);
+ 
+             useCache = false;
+ 
+             Ignite ignite = startGrid();
+ 
+             final IgniteDataStreamer<Integer, String> ldr = 
ignite.dataStreamer(null);
+ 
+             ldr.perNodeBufferSize(8192);
+             ldr.updater(DataStreamerCacheUpdaters.<Integer, 
String>batchedSorted());
+             ldr.autoFlushFrequency(0);
+ 
+             final LongAdder cnt = new LongAdder();
+ 
+             long start = U.currentTimeMillis();
+ 
+             Thread t = new Thread(new Runnable() {
+                 @SuppressWarnings("BusyWait")
+                 @Override public void run() {
+                     while (true) {
+                         try {
+                             Thread.sleep(10000);
+                         }
+                         catch (InterruptedException ignored) {
+                             break;
+                         }
+ 
+                         info(">>> Adds/sec: " + cnt.sumThenReset() / 10);
+                     }
+                 }
+             });
+ 
+             t.setDaemon(true);
+ 
+             t.start();
+ 
+             int threadNum = 2;//Runtime.getRuntime().availableProcessors();
+ 
+             multithreaded(new Callable<Object>() {
+                 @SuppressWarnings("InfiniteLoopStatement")
+                 @Override public Object call() throws Exception {
+                     ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
+ 
+                     while (true) {
+                         int i = rnd.nextInt(ENTRY_CNT);
+ 
+                         ldr.addData(i, vals[rnd.nextInt(vals.length)]);
+ 
+                         cnt.increment();
+                     }
+                 }
+             }, threadNum, "loader");
+ 
+             info("Closing loader...");
+ 
+             ldr.close(false);
+ 
+             long duration = U.currentTimeMillis() - start;
+ 
+             info("Finished performance test. Duration: " + duration + "ms.");
+         }
+         finally {
+             stopAllGrids();
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCachePerBlockLruEvictionPolicySelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCachePerBlockLruEvictionPolicySelfTest.java
index 677703d,50f3145..5f58800
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCachePerBlockLruEvictionPolicySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCachePerBlockLruEvictionPolicySelfTest.java
@@@ -115,9 -118,8 +118,8 @@@ public class IgfsCachePerBlockLruEvicti
  
          metaCacheCfg.setName("metaCache");
          metaCacheCfg.setCacheMode(REPLICATED);
 -        
metaCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
 +        metaCacheCfg.setNearConfiguration(null);
          
metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-         metaCacheCfg.setQueryIndexEnabled(false);
          metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
  
          IgniteConfiguration cfg = new IgniteConfiguration();
@@@ -173,9 -174,8 +174,8 @@@
  
          metaCacheCfg.setName("metaCache");
          metaCacheCfg.setCacheMode(REPLICATED);
 -        
metaCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
 +        metaCacheCfg.setNearConfiguration(null);
          
metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-         metaCacheCfg.setQueryIndexEnabled(false);
          metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
  
          IgniteConfiguration cfg = new IgniteConfiguration();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCacheSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java
index 334c998,6349cca..6457121
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java
@@@ -131,9 -134,8 +134,8 @@@ public class IgfsMetricsSelfTest extend
  
          metaCacheCfg.setName("metaCache");
          metaCacheCfg.setCacheMode(REPLICATED);
 -        
metaCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
 +        metaCacheCfg.setNearConfiguration(null);
          
metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-         metaCacheCfg.setQueryIndexEnabled(false);
          metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
  
          IgniteConfiguration cfg = new IgniteConfiguration();
@@@ -183,9 -184,8 +184,8 @@@
  
          metaCacheCfg.setName("metaCache");
          metaCacheCfg.setCacheMode(REPLICATED);
 -        
metaCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
 +        metaCacheCfg.setNearConfiguration(null);
          
metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-         metaCacheCfg.setQueryIndexEnabled(false);
          metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
  
          IgniteConfiguration cfg = new IgniteConfiguration();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModesSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsAbstractRecordResolverSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java
index 926da8d,a4afde9..476298d
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java
@@@ -23,8 -24,7 +24,6 @@@ import org.apache.ignite.internal.util.
  import org.apache.ignite.spi.discovery.tcp.*;
  import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
  
- import java.util.*;
- 
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
  import static org.apache.ignite.cache.CacheMode.*;
  import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  import static org.apache.ignite.events.EventType.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java
index 19c4551,10134b7..91994f5
--- 
a/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java
@@@ -25,8 -25,9 +25,8 @@@ import org.apache.ignite.spi.discovery.
  import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
  import org.apache.ignite.testframework.junits.common.*;
  
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
  import static org.apache.ignite.cache.CacheMode.*;
- import static org.apache.ignite.cache.CachePreloadMode.*;
+ import static org.apache.ignite.cache.CacheRebalanceMode.*;
  import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  
  /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 55a1b58,f0cec80..68846c3
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@@ -1114,10 -1115,9 +1114,9 @@@ public abstract class GridAbstractTest 
          CacheConfiguration cfg = new CacheConfiguration();
  
          cfg.setStartSize(1024);
-         cfg.setQueryIndexEnabled(true);
          cfg.setAtomicWriteOrderMode(PRIMARY);
          cfg.setAtomicityMode(TRANSACTIONAL);
 -        cfg.setDistributionMode(NEAR_PARTITIONED);
 +        cfg.setNearConfiguration(new NearCacheConfiguration());
          cfg.setWriteSynchronizationMode(FULL_SYNC);
          cfg.setEvictionPolicy(null);
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryModeSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --cc 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index ffe0b1a,c3a3da3..9965b05
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@@ -22,8 -23,8 +23,9 @@@ import org.apache.ignite.cache.query.*
  import org.apache.ignite.cache.query.annotations.*;
  import org.apache.ignite.configuration.*;
  import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.affinity.*;
  import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.CacheObject;
  import org.apache.ignite.internal.processors.cache.query.*;
  import org.apache.ignite.internal.processors.query.*;
  import org.apache.ignite.internal.processors.query.h2.opt.*;
@@@ -293,19 -246,7 +247,20 @@@ public class IgniteH2Indexing implement
      }
  
      /**
 +     * Creates DB schema if it has not been created yet.
 +     *
 +     * @param schema Schema name.
 +     * @throws IgniteCheckedException If failed to create db schema.
 +     */
 +    private void dropSchemaIfExists(String schema) throws 
IgniteCheckedException {
-         executeStatement("DROP SCHEMA IF EXISTS \"" + schema + '"');
++        executeStatement("INFORMATION_SCHEMA", "DROP SCHEMA IF EXISTS \"" + 
schema + '"');
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Dropped H2 schema for index database: " + schema);
 +    }
 +
 +    /**
+      * @param schema Schema
       * @param sql SQL statement.
       * @throws IgniteCheckedException If failed.
       */
@@@ -1300,53 -1184,18 +1198,33 @@@
              };
      }
  
 +    /** {@inheritDoc} */
 +    @Override public void onCacheStarted(GridCacheContext ctx) throws 
IgniteCheckedException {
 +        if (registerSpace(ctx.name()))
 +            createSchemaIfAbsent(schema(ctx.name()));
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void onCacheStopped(GridCacheContext ctx) throws 
IgniteCheckedException {
 +        if (unregisterSpace(ctx.name())) {
 +            dropSchemaIfExists(schema(ctx.name()));
 +
 +            schemas.remove(schema(ctx.name()));
 +        }
 +    }
 +
      /**
-      * Runs initial script.
-      *
-      * @throws IgniteCheckedException If failed.
-      * @throws SQLException If failed.
-      */
-     private void runInitScript() throws IgniteCheckedException, SQLException {
-         String initScriptPath = cfg.getInitialScriptPath();
- 
-         if (initScriptPath == null)
-             return;
- 
-         try (PreparedStatement p = 
connectionForThread(null).prepareStatement("RUNSCRIPT FROM ? CHARSET 'UTF-8'")) 
{
-             p.setString(1, initScriptPath);
- 
-             p.execute();
-         }
-     }
- 
-     /**
       * Registers SQL functions.
       *
-      * @throws SQLException If failed.
+      * @param schema Schema.
+      * @param clss Classes.
       * @throws IgniteCheckedException If failed.
       */
-     private void createSqlFunctions() throws SQLException, 
IgniteCheckedException {
-         Class<?>[] idxCustomFuncClss = cfg.getIndexCustomFunctionClasses();
- 
-         if (F.isEmpty(idxCustomFuncClss))
+     private void createSqlFunctions(String schema, Class<?>[] clss) throws 
IgniteCheckedException {
+         if (F.isEmpty(clss))
              return;
  
-         for (Class<?> cls : idxCustomFuncClss) {
+         for (Class<?> cls : clss) {
              for (Method m : cls.getDeclaredMethods()) {
                  QuerySqlFunction ann = 
m.getAnnotation(QuerySqlFunction.class);
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFieldsQuerySelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 144655f,6a672c0..c0c54d0
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@@ -91,10 -93,24 +93,22 @@@ public class GridCacheCrossCacheQuerySe
          cc.setName(name);
          cc.setCacheMode(mode);
          
cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-         cc.setPreloadMode(SYNC);
+         cc.setRebalanceMode(SYNC);
          cc.setSwapEnabled(true);
 -        cc.setEvictNearSynchronized(false);
          cc.setAtomicityMode(TRANSACTIONAL);
 -        cc.setDistributionMode(NEAR_PARTITIONED);
  
+         if (mode == CacheMode.PARTITIONED)
+             cc.setIndexedTypes(
+                 Integer.class, FactPurchase.class
+             );
+         else if (mode == CacheMode.REPLICATED)
+             cc.setIndexedTypes(
+                 Integer.class, DimProduct.class,
+                 Integer.class, DimStore.class
+             );
+         else
+             throw new IllegalStateException("mode: " + mode);
+ 
          return cc;
      }
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapAndSwapSelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapAndSwapSelfTest.java
index a171f98,ea34e8a..964e3a2
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapAndSwapSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapAndSwapSelfTest.java
@@@ -127,9 -128,13 +127,12 @@@ public class GridCacheOffHeapAndSwapSel
          cacheCfg.setBackups(1);
          cacheCfg.setOffHeapMaxMemory(OFFHEAP_MEM);
          cacheCfg.setEvictSynchronized(true);
 -        cacheCfg.setEvictNearSynchronized(true);
          cacheCfg.setEvictSynchronizedKeyBufferSize(1);
          cacheCfg.setAtomicityMode(TRANSACTIONAL);
 -        cacheCfg.setDistributionMode(NEAR_PARTITIONED);
+         cacheCfg.setIndexedTypes(
+             Long.class, Long.class
+         );
 +        cacheCfg.setNearConfiguration(new NearCacheConfiguration());
  
          cacheCfg.setEvictionPolicy(null);
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java
index 84d8290,6310c39..7568f26
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java
@@@ -62,9 -62,10 +62,9 @@@ public class GridIndexingWithNoopSwapSe
  
          cc.setCacheMode(PARTITIONED);
          
cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-         cc.setPreloadMode(SYNC);
+         cc.setRebalanceMode(SYNC);
          cc.setSwapEnabled(true);
 -        cc.setDistributionMode(CacheDistributionMode.NEAR_PARTITIONED);
 -        cc.setEvictNearSynchronized(false);
 +        cc.setNearConfiguration(new NearCacheConfiguration());
          cc.setEvictionPolicy(new CacheFifoEvictionPolicy(1000));
          cc.setBackups(1);
          cc.setAtomicityMode(TRANSACTIONAL);

Reply via email to