http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredSelfTest.java
new file mode 100644
index 0000000..ac37cf1
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+
+/**
+ *
+ */
+public class GridCacheOffHeapTieredSelfTest extends 
GridCacheOffHeapTieredAbstractSelfTest {
+    /** {@inheritDoc} */
+    @SuppressWarnings("RedundantMethodOverride")
+    @Override protected GridCacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapUpdateSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapUpdateSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapUpdateSelfTest.java
new file mode 100644
index 0000000..0754ec4
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapUpdateSelfTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.transactions.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+
+/**
+ * Check for specific support issue.
+ */
+public class GridCacheOffheapUpdateSelfTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(GridCacheMode.PARTITIONED);
+        ccfg.setDistributionMode(GridCacheDistributionMode.PARTITIONED_ONLY);
+        ccfg.setAtomicityMode(GridCacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setOffHeapMaxMemory(0);
+        ccfg.setMemoryMode(GridCacheMemoryMode.OFFHEAP_TIERED);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateInPessimisticTxOnRemoteNode() throws Exception {
+        try {
+            Ignite ignite = startGrids(2);
+
+            GridCache<Object, Object> rmtCache = ignite.cache(null);
+
+            int key = 0;
+
+            while (!rmtCache.affinity().isPrimary(grid(1).localNode(), key))
+                key++;
+
+            GridCache<Object, Object> locCache = grid(1).cache(null);
+
+            try (IgniteTx tx = locCache.txStart(PESSIMISTIC, REPEATABLE_READ)) 
{
+                locCache.putxIfAbsent(key, 0);
+
+                tx.commit();
+            }
+
+            try (IgniteTx tx = rmtCache.txStart(PESSIMISTIC, REPEATABLE_READ)) 
{
+                assertEquals(0, rmtCache.get(key));
+
+                rmtCache.putx(key, 1);
+
+                tx.commit();
+            }
+
+            try (IgniteTx tx = rmtCache.txStart(PESSIMISTIC, REPEATABLE_READ)) 
{
+                assertEquals(1, rmtCache.get(key));
+
+                rmtCache.putx(key, 2);
+
+                tx.commit();
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadEvictedPartition() throws Exception {
+        try {
+            Ignite grid = startGrid(0);
+
+            GridCache<Object, Object> cache = grid.cache(null);
+
+            for (int i = 0; i < 30; i++)
+                cache.put(i, 0);
+
+            startGrid(1);
+
+            awaitPartitionMapExchange();
+
+            for (int i = 0; i < 30; i++)
+                grid(1).cache(null).put(i, 10);
+
+            // Find a key that does not belong to started node anymore.
+            int key = 0;
+
+            ClusterNode locNode = grid.cluster().localNode();
+
+            for (;key < 30; key++) {
+                if (!cache.affinity().isPrimary(locNode, key) && 
!cache.affinity().isBackup(locNode, key))
+                    break;
+            }
+
+            assertEquals(10, cache.get(key));
+
+            try (IgniteTx ignored = cache.txStart(OPTIMISTIC, 
REPEATABLE_READ)) {
+                assertEquals(10, cache.get(key));
+            }
+
+            try (IgniteTx ignored = cache.txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+                assertEquals(10, cache.get(key));
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
new file mode 100644
index 0000000..56fc683
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+
+/**
+ * Checks ordered preloading.
+ */
+public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest 
{
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Number of grids in test. */
+    private static final int GRID_CNT = 4;
+
+    /** First cache name. */
+    public static final String FIRST_CACHE_NAME = "first";
+
+    /** Second cache name. */
+    public static final String SECOND_CACHE_NAME = "second";
+
+    /** First cache mode. */
+    private GridCacheMode firstCacheMode;
+
+    /** Second cache mode. */
+    private GridCacheMode secondCacheMode;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(
+            cacheConfig(firstCacheMode, 1, FIRST_CACHE_NAME),
+            cacheConfig(secondCacheMode, 2, SECOND_CACHE_NAME));
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        return cfg;
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param preloadOrder Preload order.
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfig(GridCacheMode cacheMode, int 
preloadOrder, String name) {
+        CacheConfiguration cfg = defaultCacheConfiguration();
+
+        cfg.setName(name);
+        cfg.setCacheMode(cacheMode);
+        cfg.setPreloadOrder(preloadOrder);
+        cfg.setPreloadMode(ASYNC);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPreloadOrderPartitionedPartitioned() throws Exception {
+        checkPreloadOrder(PARTITIONED, PARTITIONED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPreloadOrderReplicatedReplicated() throws Exception {
+        checkPreloadOrder(REPLICATED, REPLICATED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPreloadOrderPartitionedReplicated() throws Exception {
+        checkPreloadOrder(PARTITIONED, REPLICATED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPreloadOrderReplicatedPartitioned() throws Exception {
+        checkPreloadOrder(REPLICATED, PARTITIONED);
+    }
+
+    /**
+     * @param first First cache mode.
+     * @param second Second cache mode.
+     * @throws Exception If failed.
+     */
+    private void checkPreloadOrder(GridCacheMode first, GridCacheMode second) 
throws Exception {
+        firstCacheMode = first;
+        secondCacheMode = second;
+
+        Ignite g = startGrid(0);
+
+        try {
+            GridCache<Object, Object> cache = g.cache("first");
+
+            // Put some data into cache.
+            for (int i = 0; i < 1000; i++)
+                cache.put(i, i);
+
+            for (int i = 1; i < GRID_CNT; i++)
+                startGrid(i);
+
+            // For first node in topology replicated preloader gets completed 
right away.
+            for (int i = 1; i < GRID_CNT; i++) {
+                GridKernal kernal = (GridKernal)grid(i);
+
+                GridFutureAdapter<?> fut1 = 
(GridFutureAdapter<?>)kernal.internalCache(FIRST_CACHE_NAME).preloader()
+                    .syncFuture();
+                GridFutureAdapter<?> fut2 = 
(GridFutureAdapter<?>)kernal.internalCache(SECOND_CACHE_NAME).preloader()
+                    .syncFuture();
+
+                fut1.get();
+                fut2.get();
+
+                assertTrue("[i=" + i + ", fut1=" + fut1 + ", fut2=" + fut2 + 
']', fut1.endTime() <= fut2.endTime());
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java
new file mode 100644
index 0000000..01a1212
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.swapspace.file.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.configuration.IgniteDeploymentMode.*;
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+
+/**
+ *
+ */
+public class GridCacheP2PUndeploySelfTest extends GridCommonAbstractTest {
+    /** Test p2p value. */
+    private static final String TEST_VALUE = 
"org.gridgain.grid.tests.p2p.GridCacheDeploymentTestValue3";
+
+    /** */
+    private static final long OFFHEAP = 0;// 4 * 1024 * 1024;
+
+    /** */
+    private final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private final AtomicInteger idxGen = new AtomicInteger();
+
+    /** */
+    private GridCachePreloadMode mode = SYNC;
+
+    /** */
+    private boolean offheap;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setNetworkTimeout(2000);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+
+        cfg.setMarshaller(new IgniteJdkMarshaller());
+
+        cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
+
+        CacheConfiguration repCacheCfg = defaultCacheConfiguration();
+
+        repCacheCfg.setName("replicated");
+        repCacheCfg.setCacheMode(REPLICATED);
+        repCacheCfg.setPreloadMode(mode);
+        
repCacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        repCacheCfg.setQueryIndexEnabled(false);
+        repCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        if (offheap)
+            repCacheCfg.setOffHeapMaxMemory(OFFHEAP);
+        else
+            repCacheCfg.setSwapEnabled(true);
+
+        CacheConfiguration partCacheCfg = defaultCacheConfiguration();
+
+        partCacheCfg.setName("partitioned");
+        partCacheCfg.setCacheMode(PARTITIONED);
+        partCacheCfg.setPreloadMode(mode);
+        partCacheCfg.setAffinity(new GridCacheModuloAffinityFunction(11, 1));
+        
partCacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        partCacheCfg.setEvictNearSynchronized(false);
+        partCacheCfg.setQueryIndexEnabled(false);
+        partCacheCfg.setAtomicityMode(TRANSACTIONAL);
+        partCacheCfg.setDistributionMode(NEAR_PARTITIONED);
+
+        if (offheap)
+            partCacheCfg.setOffHeapMaxMemory(OFFHEAP);
+        else
+            partCacheCfg.setSwapEnabled(true);
+
+        cfg.setCacheConfiguration(repCacheCfg, partCacheCfg);
+
+        cfg.setDeploymentMode(SHARED);
+        
cfg.setPeerClassLoadingLocalClassPathExclude(GridCacheP2PUndeploySelfTest.class.getName());
+
+        
cfg.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, 
idxGen.getAndIncrement()));
+
+        return cfg;
+    }
+
+    /** @throws Exception If failed. */
+    public void testSwapP2PReplicated() throws Exception {
+        offheap = false;
+
+        checkP2PUndeploy("replicated");
+    }
+
+    /** @throws Exception If failed. */
+    public void testOffHeapP2PReplicated() throws Exception {
+        offheap = true;
+
+        checkP2PUndeploy("replicated");
+    }
+
+    /** @throws Exception If failed. */
+    public void testSwapP2PPartitioned() throws Exception {
+        offheap = false;
+
+        checkP2PUndeploy("partitioned");
+    }
+
+    /** @throws Exception If failed. */
+    public void testOffheapP2PPartitioned() throws Exception {
+        offheap = true;
+
+        checkP2PUndeploy("partitioned");
+    }
+
+    /** @throws Exception If failed. */
+    public void testSwapP2PReplicatedNoPreloading() throws Exception {
+        mode = NONE;
+        offheap = false;
+
+        checkP2PUndeploy("replicated");
+    }
+
+    /** @throws Exception If failed. */
+    public void testOffHeapP2PReplicatedNoPreloading() throws Exception {
+        mode = NONE;
+        offheap = true;
+
+        checkP2PUndeploy("replicated");
+    }
+
+    /** @throws Exception If failed. */
+    public void testSwapP2PPartitionedNoPreloading() throws Exception {
+        mode = NONE;
+        offheap = false;
+
+        checkP2PUndeploy("partitioned");
+    }
+
+    /** @throws Exception If failed. */
+    public void testOffHeapP2PPartitionedNoPreloading() throws Exception {
+        mode = NONE;
+        offheap = true;
+
+        checkP2PUndeploy("partitioned");
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param g Grid.
+     * @return Size.
+     * @throws IgniteCheckedException If failed.
+     */
+    private long size(String cacheName, GridKernal g) throws 
IgniteCheckedException {
+        if (offheap)
+            return g.cache(cacheName).offHeapEntriesCount();
+
+        return g.context().swap().swapSize(swapSpaceName(cacheName, g));
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @throws Exception If failed.
+     */
+    private void checkP2PUndeploy(String cacheName) throws Exception {
+        assert !F.isEmpty(cacheName);
+
+        ClassLoader ldr = getExternalClassLoader();
+
+        Class valCls = ldr.loadClass(TEST_VALUE);
+
+        try {
+            Ignite ignite1 = startGrid(1);
+            GridKernal grid2 = (GridKernal)startGrid(2);
+
+            GridCache<Integer, Object> cache1 = ignite1.cache(cacheName);
+            GridCache<Integer, Object> cache2 = grid2.cache(cacheName);
+
+            Object v1 = valCls.newInstance();
+
+            cache1.put(1, v1);
+            cache1.put(2, valCls.newInstance());
+            cache1.put(3, valCls.newInstance());
+            cache1.put(4, valCls.newInstance());
+
+            info("Stored value in cache1 [v=" + v1 + ", ldr=" + 
v1.getClass().getClassLoader() + ']');
+
+            Object v2 = cache2.get(1);
+
+            assert v2 != null;
+
+            info("Read value from cache2 [v=" + v2 + ", ldr=" + 
v2.getClass().getClassLoader() + ']');
+
+            assert v2 != null;
+            assert v2.toString().equals(v1.toString());
+            assert 
!v2.getClass().getClassLoader().equals(getClass().getClassLoader());
+            assert 
v2.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader");
+
+            assert cache2.evict(2);
+            assert cache2.evict(3);
+            assert cache2.evict(4);
+
+            long swapSize = size(cacheName, grid2);
+
+            info("Swap size: " + swapSize);
+
+            assert swapSize > 0;
+
+            stopGrid(1);
+
+            assert waitCacheEmpty(cache2, 10000);
+
+            for (int i = 0; i < 3; i++) {
+                swapSize = size(cacheName, grid2);
+
+                if (swapSize > 0) {
+                    if (i < 2) {
+                        U.warn(log, "Swap size check failed (will retry in 
1000 ms): " + swapSize);
+
+                        U.sleep(1000);
+
+                        continue;
+                    }
+
+                    fail("Swap size check failed: " + swapSize);
+                }
+                else if (swapSize == 0)
+                    break;
+                else
+                    assert false : "Negative swap size: " + swapSize;
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param grid Kernal.
+     * @return Name for swap space.
+     */
+    private String swapSpaceName(String cacheName, GridKernal grid) {
+        GridCacheContext<Object, Object> cctx = 
grid.internalCache(cacheName).context();
+
+        return CU.swapSpaceName(cctx.isNear() ? cctx.near().dht().context() : 
cctx);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param timeout Timeout.
+     * @return {@code True} if success.
+     * @throws InterruptedException If thread was interrupted.
+     */
+    @SuppressWarnings({"BusyWait"})
+    private boolean waitCacheEmpty(GridCacheProjection<Integer, Object> cache, 
long timeout)
+        throws InterruptedException {
+        assert cache != null;
+        assert timeout >= 0;
+
+        long end = System.currentTimeMillis() + timeout;
+
+        while (end - System.currentTimeMillis() >= 0) {
+            if (cache.isEmpty())
+                return true;
+
+            Thread.sleep(500);
+        }
+
+        return cache.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
new file mode 100644
index 0000000..e5c9d21
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+import static 
org.apache.ignite.cache.affinity.consistenthash.GridCacheConsistentHashAffinityFunction.*;
+
+/**
+ *
+ */
+public class GridCachePartitionedAffinitySpreadTest extends 
GridCommonAbstractTest {
+    /** */
+    public static final int NODES_CNT = 50;
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionSpreading() throws Exception {
+        System.out.printf("%6s, %6s, %6s, %6s, %8s\n", "Nodes", "Reps", "Min", 
"Max", "Dev");
+
+        for (int i = 5; i < NODES_CNT; i = i * 3 / 2) {
+            for (int replicas = 128; replicas <= 4096; replicas*=2) {
+                Collection<ClusterNode> nodes = createNodes(i, replicas);
+
+                GridCacheConsistentHashAffinityFunction aff = new 
GridCacheConsistentHashAffinityFunction(false, 10000);
+
+                checkDistribution(aff, nodes);
+            }
+
+            System.out.println();
+        }
+    }
+
+    /**
+     * @param nodesCnt Nodes count.
+     * @param replicas Value of
+     * @return Collection of test nodes.
+     */
+    private Collection<ClusterNode> createNodes(int nodesCnt, int replicas) {
+        Collection<ClusterNode> nodes = new ArrayList<>(nodesCnt);
+
+        for (int i = 0; i < nodesCnt; i++)
+            nodes.add(new TestRichNode(replicas));
+
+        return nodes;
+    }
+
+    /**
+     * @param aff Affinity to check.
+     * @param nodes Collection of nodes to test on.
+     */
+    private void checkDistribution(GridCacheConsistentHashAffinityFunction 
aff, Collection<ClusterNode> nodes) {
+        Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size());
+
+        for (int part = 0; part < aff.getPartitions(); part++) {
+            Collection<ClusterNode> affNodes = aff.nodes(part, nodes, 0);
+
+            assertEquals(1, affNodes.size());
+
+            ClusterNode node = F.first(affNodes);
+
+            parts.put(node, parts.get(node) != null ? parts.get(node) + 1 : 1);
+        }
+
+        int min = Integer.MAX_VALUE;
+        int max = Integer.MIN_VALUE;
+        int total = 0;
+
+        float mean = 0;
+        float m2 = 0;
+        int n = 0;
+
+        for (ClusterNode node : nodes) {
+            int partsCnt = parts.get(node) != null ? parts.get(node) : 0;
+
+            total += partsCnt;
+
+            if (partsCnt < min)
+                min = partsCnt;
+
+            if (partsCnt > max)
+                max = partsCnt;
+
+            n++;
+            float delta = partsCnt - mean;
+            mean += delta / n;
+            m2 += delta * (partsCnt - mean);
+        }
+
+        m2 /= (n - 1);
+        assertEquals(aff.getPartitions(), total);
+
+        System.out.printf("%6s, %6s, %6s, %6s, %8.4f\n", nodes.size(),
+            F.first(nodes).attribute(DFLT_REPLICA_COUNT_ATTR_NAME), min, max, 
Math.sqrt(m2));
+    }
+
+    /**
+     * Rich node stub to use in emulated server topology.
+     */
+    private static class TestRichNode extends GridTestNode {
+        /** */
+        private final UUID nodeId;
+
+        /** */
+        private final int replicas;
+
+        /**
+         * Externalizable class requires public no-arg constructor.
+         */
+        @SuppressWarnings("UnusedDeclaration")
+        private TestRichNode(int replicas) {
+            this(UUID.randomUUID(), replicas);
+        }
+
+        /**
+         * Constructs rich node stub to use in emulated server topology.
+         *
+         * @param nodeId Node id.
+         */
+        private TestRichNode(UUID nodeId, int replicas) {
+            this.nodeId = nodeId;
+            this.replicas = replicas;
+        }
+
+        /**
+         * Unused constructor for externalizable support.
+         */
+        public TestRichNode() {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public UUID id() {
+            return nodeId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T> T attribute(String name) {
+            if (DFLT_REPLICA_COUNT_ATTR_NAME.equals(name))
+                return (T)new Integer(replicas);
+
+            return super.attribute(name);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
new file mode 100644
index 0000000..5d51126
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+import static org.apache.ignite.internal.GridTopic.*;
+
+/**
+ *
+ */
+public class GridCachePartitionedGetSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int GRID_CNT = 3;
+
+    /** */
+    private static final String KEY = "key";
+
+    /** */
+    private static final int VAL = 1;
+
+    /** */
+    private static final AtomicBoolean received = new AtomicBoolean();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setDiscoverySpi(discoverySpi());
+        cfg.setCacheConfiguration(cacheConfiguration());
+
+        return cfg;
+    }
+
+    /**
+     * @return Discovery SPI;
+     */
+    private DiscoverySpi discoverySpi() {
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(IP_FINDER);
+
+        return spi;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration() {
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(PARTITIONED);
+        cc.setBackups(1);
+        cc.setPreloadMode(SYNC);
+        cc.setWriteSynchronizationMode(FULL_SYNC);
+        cc.setSwapEnabled(true);
+        cc.setEvictNearSynchronized(false);
+        cc.setEvictSynchronized(false);
+        cc.setDistributionMode(PARTITIONED_ONLY);
+
+        return cc;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(GRID_CNT);
+
+        prepare();
+    }
+
+    @Override protected void beforeTest() throws Exception {
+        received.set(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetFromPrimaryNode() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++) {
+            GridCache<String, Integer> c = grid(i).cache(null);
+
+            GridCacheEntry<String, Integer> e = c.entry(KEY);
+
+            if (e.primary()) {
+                info("Primary node: " + grid(i).localNode().id());
+
+                c.get(KEY);
+
+                break;
+            }
+        }
+
+        assert !await();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetFromBackupNode() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++) {
+            GridCache<String, Integer> c = grid(i).cache(null);
+
+            GridCacheEntry<String, Integer> e = c.entry(KEY);
+
+            if (e.backup()) {
+                info("Backup node: " + grid(i).localNode().id());
+
+                Integer val = c.get(KEY);
+
+                assert val != null && val == 1;
+
+                assert !await();
+
+                assert c.evict(KEY);
+
+                assert c.peek(KEY) == null;
+
+                val = c.get(KEY);
+
+                assert val != null && val == 1;
+
+                assert !await();
+
+                break;
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetFromNearNode() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++) {
+            GridCache<String, Integer> c = grid(i).cache(null);
+
+            GridCacheEntry<String, Integer> e = c.entry(KEY);
+
+            if (!e.primary() && !e.backup()) {
+                info("Near node: " + grid(i).localNode().id());
+
+                Integer val = c.get(KEY);
+
+                assert val != null && val == 1;
+
+                break;
+            }
+        }
+
+        assert await();
+    }
+
+    /**
+     * @return {@code True} if awaited message.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"BusyWait"})
+    private boolean await() throws Exception {
+        info("Checking flag: " + System.identityHashCode(received));
+
+        for (int i = 0; i < 3; i++) {
+            if (received.get())
+                return true;
+
+            info("Flag is false.");
+
+            Thread.sleep(500);
+        }
+
+        return received.get();
+    }
+
+    /**
+     * Puts value to primary node and registers listener
+     * that sets {@link #received} flag to {@code true}
+     * if {@link GridNearGetRequest} was received on primary node.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("deprecation")
+    private void prepare() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++) {
+            Ignite g = grid(i);
+
+            GridCacheEntry<String, Integer> e = g.<String, 
Integer>cache(null).entry(KEY);
+
+            if (e.primary()) {
+                info("Primary node: " + g.cluster().localNode().id());
+
+                // Put value.
+                g.cache(null).put(KEY, VAL);
+
+                // Register listener.
+                ((GridKernal)g).context().io().addMessageListener(
+                    TOPIC_CACHE,
+                    new GridMessageListener() {
+                        @Override public void onMessage(UUID nodeId, Object 
msg) {
+                            info("Received message from node [nodeId=" + 
nodeId + ", msg=" + msg + ']');
+
+                            if (msg instanceof GridNearGetRequest) {
+                                info("Setting flag: " + 
System.identityHashCode(received));
+
+                                received.set(true);
+                            }
+                        }
+                    }
+                );
+
+                break;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedProjectionAffinitySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedProjectionAffinitySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedProjectionAffinitySelfTest.java
new file mode 100644
index 0000000..7cd7102
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedProjectionAffinitySelfTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+
+/**
+ * Partitioned affinity test for projections.
+ */
+@SuppressWarnings({"PointlessArithmeticExpression"})
+public class GridCachePartitionedProjectionAffinitySelfTest extends 
GridCommonAbstractTest {
+    /** Backup count. */
+    private static final int BACKUPS = 1;
+
+    /** Grid count. */
+    private static final int GRIDS = 3;
+
+    /** */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setBackups(BACKUPS);
+        
cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setPreloadMode(SYNC);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setDistributionMode(NEAR_PARTITIONED);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(GRIDS);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** @throws Exception If failed. */
+    public void testAffinity() throws Exception {
+        waitTopologyUpdate();
+
+        Ignite g0 = grid(0);
+        Ignite g1 = grid(1);
+
+        for (int i = 0; i < 100; i++)
+            assertEquals(g0.cluster().mapKeyToNode(null, i).id(), 
g1.cluster().mapKeyToNode(null, i).id());
+    }
+
+    /** @throws Exception If failed. */
+    @SuppressWarnings("deprecation")
+    public void testProjectionAffinity() throws Exception {
+        waitTopologyUpdate();
+
+        Ignite g0 = grid(0);
+        Ignite g1 = grid(1);
+
+        ClusterGroup g0Pinned = 
g0.cluster().forNodeIds(F.asList(g0.cluster().localNode().id()));
+
+        ClusterGroup g01Pinned =
+            g1.cluster().forNodeIds(F.asList(g0.cluster().localNode().id(), 
g1.cluster().localNode().id()));
+
+        for (int i = 0; i < 100; i++)
+            assertEquals(g0Pinned.ignite().cluster().mapKeyToNode(null, 
i).id(),
+                g01Pinned.ignite().cluster().mapKeyToNode(null, i).id());
+    }
+
+    /** @throws Exception If failed. */
+    @SuppressWarnings("BusyWait")
+    private void waitTopologyUpdate() throws Exception {
+        GridTestUtils.waitTopologyUpdate(null, BACKUPS, log());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java
new file mode 100644
index 0000000..57807e4
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.transactions.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+
+/**
+ * Test that in {@link GridCacheMode#PARTITIONED} mode cache writes values 
only to the near cache store. <p/> This check
+ * is needed because in current implementation if {@link 
GridCacheWriteBehindStore} assumes that and user store is
+ * wrapped only in near cache (see {@link GridCacheProcessor} init logic).
+ */
+@SuppressWarnings({"unchecked"})
+public class GridCachePartitionedWritesTest extends GridCommonAbstractTest {
+    /** Cache store. */
+    private CacheStore store;
+
+    /** {@inheritDoc} */
+    @Override protected final IgniteConfiguration getConfiguration(String 
gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        c.setDiscoverySpi(disco);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(GridCacheMode.PARTITIONED);
+        
cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cc.setSwapEnabled(false);
+        cc.setAtomicityMode(TRANSACTIONAL);
+        cc.setDistributionMode(NEAR_PARTITIONED);
+
+        assert store != null;
+
+        cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
+        cc.setReadThrough(true);
+        cc.setWriteThrough(true);
+        cc.setLoadPreviousValue(true);
+
+        c.setCacheConfiguration(cc);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        store = null;
+
+        super.afterTest();
+    }
+
+    /** @throws Exception If test fails. */
+    public void testWrite() throws Exception {
+        final AtomicInteger putCnt = new AtomicInteger();
+        final AtomicInteger rmvCnt = new AtomicInteger();
+
+        store = new CacheStoreAdapter<Object, Object>() {
+            @Override public Object load(Object key) {
+                info(">>> Get [key=" + key + ']');
+
+                return null;
+            }
+
+            @Override public void write(Cache.Entry<? extends Object, ? 
extends Object> entry) {
+                putCnt.incrementAndGet();
+            }
+
+            @Override public void delete(Object key) {
+                rmvCnt.incrementAndGet();
+            }
+        };
+
+        startGrid();
+
+        GridCache<Integer, String> cache = cache();
+
+        try {
+            cache.get(1);
+
+            IgniteTx tx = cache.txStart();
+
+            try {
+                for (int i = 1; i <= 10; i++)
+                    cache.putx(i, Integer.toString(i));
+
+                tx.commit();
+            }
+            finally {
+                tx.close();
+            }
+
+            assert cache.size() == 10;
+
+            assert putCnt.get() == 10;
+
+            tx = cache.txStart();
+
+            try {
+                for (int i = 1; i <= 10; i++) {
+                    String val = cache.remove(i);
+
+                    assert val != null;
+                    assert val.equals(Integer.toString(i));
+                }
+
+                tx.commit();
+            }
+            finally {
+                tx.close();
+            }
+
+            assert rmvCnt.get() == 10;
+        }
+        finally {
+            stopGrid();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePreloadingEvictionsSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePreloadingEvictionsSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePreloadingEvictionsSelfTest.java
new file mode 100644
index 0000000..7382503
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePreloadingEvictionsSelfTest.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ *
+ */
+public class GridCachePreloadingEvictionsSelfTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final String VALUE = createValue();
+
+    /** */
+    private final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private final AtomicInteger idxGen = new AtomicInteger();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+
+        CacheConfiguration partCacheCfg = defaultCacheConfiguration();
+
+        partCacheCfg.setCacheMode(PARTITIONED);
+        partCacheCfg.setAffinity(new GridCacheModuloAffinityFunction(1, 1));
+        partCacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+        partCacheCfg.setDistributionMode(PARTITIONED_ONLY);
+        partCacheCfg.setEvictSynchronized(true);
+        partCacheCfg.setSwapEnabled(false);
+        partCacheCfg.setEvictionPolicy(null);
+        partCacheCfg.setEvictSynchronizedKeyBufferSize(25);
+        partCacheCfg.setEvictMaxOverflowRatio(0.99f);
+        partCacheCfg.setPreloadMode(ASYNC);
+        partCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        // This test requires artificial slowing down of the preloading.
+        partCacheCfg.setPreloadThrottle(2000);
+
+        cfg.setCacheConfiguration(partCacheCfg);
+
+        
cfg.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, 
idxGen.getAndIncrement()));
+
+        cfg.setNetworkTimeout(60000);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("BusyWait")
+    public void testEvictions() throws Exception {
+        try {
+            final Ignite ignite1 = startGrid(1);
+
+            GridCache<Integer, Object> cache1 = ignite1.cache(null);
+
+            for (int i = 0; i < 5000; i++)
+                cache1.put(i, VALUE + i);
+
+            info("Finished data population.");
+
+            final AtomicBoolean done = new AtomicBoolean();
+
+            final CountDownLatch startLatch = new CountDownLatch(1);
+
+            int oldSize = cache1.size();
+
+            IgniteFuture fut = multithreadedAsync(
+                new Callable<Object>() {
+                    @Nullable @Override public Object call() throws Exception {
+                        startLatch.await();
+
+                        info("Started evicting...");
+
+                        for (int i = 0; i < 3000 && !done.get(); i++) {
+                            GridCacheEntry<Integer, Object> entry = 
randomEntry(ignite1);
+
+                            if (entry != null)
+                                entry.evict();
+                            else
+                                info("Entry is null.");
+                        }
+
+                        info("Finished evicting.");
+
+                        return null;
+                    }
+                },
+                1);
+
+            ignite1.events().localListen(
+                new IgnitePredicate<IgniteEvent>() {
+                    @Override public boolean apply(IgniteEvent evt) {
+                        startLatch.countDown();
+
+                        return true;
+                    }
+                },
+                EVT_NODE_JOINED);
+
+            final Ignite ignite2 = startGrid(2);
+
+            done.set(true);
+
+            fut.get();
+
+            sleepUntilCashesEqualize(ignite1, ignite2, oldSize);
+
+            checkCachesConsistency(ignite1, ignite2);
+
+            oldSize = cache1.size();
+
+            info("Evicting on constant topology.");
+
+            for (int i = 0; i < 1000; i++) {
+                GridCacheEntry<Integer, Object> entry = randomEntry(ignite1);
+
+                if (entry != null)
+                    entry.evict();
+                else
+                    info("Entry is null.");
+            }
+
+            sleepUntilCashesEqualize(ignite1, ignite2, oldSize);
+
+            checkCachesConsistency(ignite1, ignite2);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Waits until cache stabilizes on new value.
+     *
+     * @param ignite1 Grid 1.
+     * @param ignite2 Grid 2.
+     * @param oldSize Old size, stable size should be .
+     * @throws org.apache.ignite.IgniteInterruptedException If interrupted.
+     */
+    private void sleepUntilCashesEqualize(final Ignite ignite1, final Ignite 
ignite2, final int oldSize)
+        throws IgniteInterruptedException {
+        info("Sleeping...");
+
+        assertTrue(GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                int size1 = ignite1.cache(null).size();
+                return size1 != oldSize && size1 == ignite2.cache(null).size();
+            }
+        }, getTestTimeout()));
+
+        info("Sleep finished.");
+    }
+
+    /**
+     * @param g Grid.
+     * @return Random entry from cache.
+     */
+    @Nullable private GridCacheEntry<Integer, Object> randomEntry(Ignite g) {
+        GridKernal g1 = (GridKernal)g;
+
+        return g1.<Integer, Object>internalCache().randomEntry();
+    }
+
+    /**
+     * @param ignite1 Grid 1.
+     * @param ignite2 Grid 2.
+     * @throws Exception If failed.
+     */
+    private void checkCachesConsistency(Ignite ignite1, Ignite ignite2) throws 
Exception {
+        GridKernal g1 = (GridKernal) ignite1;
+        GridKernal g2 = (GridKernal) ignite2;
+
+        GridCacheAdapter<Integer, Object> cache1 = g1.internalCache();
+        GridCacheAdapter<Integer, Object> cache2 = g2.internalCache();
+
+        for (int i = 0; i < 3; i++) {
+            if (cache1.size() != cache2.size()) {
+                U.warn(log, "Sizes do not match (will retry in 1000 ms) [s1=" 
+ cache1.size() +
+                    ", s2=" + cache2.size() + ']');
+
+                U.sleep(1000);
+            }
+            else
+                break;
+        }
+
+        info("Cache1 size: " + cache1.size());
+        info("Cache2 size: " + cache2.size());
+
+        assert cache1.size() == cache2.size() : "Sizes do not match [s1=" + 
cache1.size() +
+            ", s2=" + cache2.size() + ']';
+
+        for (Integer key : cache1.keySet()) {
+            Object e = cache1.peek(key);
+
+            if (e != null)
+                assert cache2.containsKey(key, null) : "Cache2 does not 
contain key: " + key;
+        }
+    }
+
+    /**
+     * @return Large value for test.
+     */
+    private static String createValue() {
+        SB sb = new SB(1024);
+
+        for (int i = 0; i < 64; i++)
+            sb.a("val1");
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java
new file mode 100644
index 0000000..b311058
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java
@@ -0,0 +1,707 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import com.google.common.collect.*;
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.failover.*;
+import org.apache.ignite.spi.failover.always.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Tests putAll() method along with failover and different configurations.
+ */
+public class GridCachePutAllFailoverSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Size of the test map. */
+    private static final int TEST_MAP_SIZE = 100000;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "partitioned";
+
+    /** Size of data chunk, sent to a remote node. */
+    private static final int DATA_CHUNK_SIZE = 1000;
+
+    /** Number of chunk on which to fail worker node. */
+    public static final int FAIL_ON_CHUNK_NO = (TEST_MAP_SIZE / 
DATA_CHUNK_SIZE) / 3;
+
+    /** Await timeout in seconds. */
+    public static final int AWAIT_TIMEOUT_SEC = 65;
+
+    /** */
+    private static final int FAILOVER_PUSH_GAP = 30;
+
+    /** Master node name. */
+    private static final String MASTER = "master";
+
+    /** Near enabled flag. */
+    private boolean nearEnabled;
+
+    /** Backups count. */
+    private int backups;
+
+    /** Filter to include only worker nodes. */
+    private static final IgnitePredicate<ClusterNode> workerNodesFilter = new 
PN() {
+        @SuppressWarnings("unchecked")
+        @Override public boolean apply(ClusterNode n) {
+             return "worker".equals(n.attribute("segment"));
+        }
+    };
+
+    /**
+     * Result future queue (restrict the queue size
+     * to 50 in order to prevent in-memory data grid from over loading).
+     */
+    private final BlockingQueue<ComputeTaskFuture<?>> resQueue = new 
LinkedBlockingQueue<>(50);
+
+    /** Test failover SPI. */
+    private MasterFailoverSpi failoverSpi = new 
MasterFailoverSpi((IgnitePredicate)workerNodesFilter);
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverColocatedNearEnabledThreeBackups() throws 
Exception {
+        checkPutAllFailoverColocated(true, 7, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverColocatedNearDisabledThreeBackups() throws 
Exception {
+        checkPutAllFailoverColocated(false, 7, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverNearEnabledOneBackup() throws Exception {
+        checkPutAllFailover(true, 3, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverNearDisabledOneBackup() throws Exception {
+        checkPutAllFailover(false, 3, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverNearEnabledTwoBackups() throws Exception {
+        checkPutAllFailover(true, 5, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverNearDisabledTwoBackups() throws Exception {
+        checkPutAllFailover(false, 5, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverNearEnabledThreeBackups() throws Exception {
+        checkPutAllFailover(true, 7, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverNearDisabledThreeBackups() throws Exception {
+        checkPutAllFailover(false, 7, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverColocatedNearEnabledOneBackup() throws 
Exception {
+        checkPutAllFailoverColocated(true, 3, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverColocatedNearDisabledOneBackup() throws 
Exception {
+        checkPutAllFailoverColocated(false, 3, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverColocatedNearEnabledTwoBackups() throws 
Exception {
+        checkPutAllFailoverColocated(true, 5, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllFailoverColocatedNearDisabledTwoBackups() throws 
Exception {
+        checkPutAllFailoverColocated(false, 5, 2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return super.getTestTimeout() * 5;
+    }
+
+    /**
+     * Tests putAll() method along with failover and cache backup.
+     *
+     * Checks that the resulting primary cache size is the same as
+     * expected.
+     *
+     * @param near Near enabled.
+     * @param workerCnt Worker count.
+     * @param shutdownCnt Shutdown count.
+     * @throws Exception If failed.
+     */
+    public void checkPutAllFailover(boolean near, int workerCnt, int 
shutdownCnt) throws Exception {
+        nearEnabled = near;
+        backups = shutdownCnt;
+
+        Collection<Integer> testKeys = generateTestKeys();
+
+        final Ignite master = startGrid(MASTER);
+
+        List<Ignite> workers = new ArrayList<>(workerCnt);
+
+        for (int i = 1; i <= workerCnt; i++)
+            workers.add(startGrid("worker" + i));
+
+        info("Master: " + master.cluster().localNode().id());
+
+        List<Ignite> runningWorkers = new ArrayList<>(workerCnt);
+
+        for (int i = 1; i <= workerCnt; i++) {
+            UUID id = workers.get(i - 1).cluster().localNode().id();
+
+            info(String.format("Worker%d - %s", i, id));
+
+            runningWorkers.add(workers.get(i - 1));
+        }
+
+        try {
+            // Dummy call to fetch affinity function from remote node
+            master.cluster().mapKeyToNode(CACHE_NAME, "Dummy");
+
+            Random rnd = new Random();
+
+            Collection<Integer> dataChunk = new ArrayList<>(DATA_CHUNK_SIZE);
+            int entryCntr = 0;
+            int chunkCntr = 0;
+            final AtomicBoolean jobFailed = new AtomicBoolean(false);
+
+            int failoverPushGap = 0;
+
+            final CountDownLatch emptyLatch = new CountDownLatch(1);
+
+            final AtomicBoolean inputExhausted = new AtomicBoolean();
+
+            IgniteCompute comp = 
compute(master.cluster().forPredicate(workerNodesFilter)).enableAsync();
+
+            for (Integer key : testKeys) {
+                dataChunk.add(key);
+                entryCntr++;
+
+                if (entryCntr == DATA_CHUNK_SIZE) { // time to send data
+                    chunkCntr++;
+
+                    assert dataChunk.size() == DATA_CHUNK_SIZE;
+
+                    log.info("Pushing data chunk [chunkNo=" + chunkCntr + "]");
+
+                    comp.execute(
+                        new GridCachePutAllTask(
+                            
runningWorkers.get(rnd.nextInt(runningWorkers.size())).cluster().localNode().id(),
+                            CACHE_NAME),
+                            dataChunk);
+
+                    ComputeTaskFuture<Void> fut = comp.future();
+
+                    resQueue.put(fut); // Blocks if queue is full.
+
+                    fut.listenAsync(new CI1<IgniteFuture<Void>>() {
+                        @Override public void apply(IgniteFuture<Void> f) {
+                            ComputeTaskFuture<?> taskFut = 
(ComputeTaskFuture<?>)f;
+
+                            try {
+                                taskFut.get(); //if something went wrong - 
we'll get exception here
+                            }
+                            catch (IgniteCheckedException e) {
+                                log.error("Job failed", e);
+
+                                jobFailed.set(true);
+                            }
+
+                            // Remove complete future from queue to allow 
other jobs to proceed.
+                            resQueue.remove(taskFut);
+
+                            if (inputExhausted.get() && resQueue.isEmpty())
+                                emptyLatch.countDown();
+                        }
+                    });
+
+                    entryCntr = 0;
+                    dataChunk = new ArrayList<>(DATA_CHUNK_SIZE);
+
+                    if (chunkCntr >= FAIL_ON_CHUNK_NO) {
+                        if (workerCnt - runningWorkers.size() < shutdownCnt) {
+                            if (failoverPushGap > 0)
+                                failoverPushGap--;
+                            else {
+                                Ignite victim = runningWorkers.remove(0);
+
+                                info("Shutting down node: " + 
victim.cluster().localNode().id());
+
+                                stopGrid(victim.name());
+
+                                // Fail next node after some jobs have been 
pushed.
+                                failoverPushGap = FAILOVER_PUSH_GAP;
+                            }
+                        }
+                    }
+                }
+            }
+
+            inputExhausted.set(true);
+
+            if (resQueue.isEmpty())
+                emptyLatch.countDown();
+
+            assert chunkCntr == TEST_MAP_SIZE / DATA_CHUNK_SIZE;
+
+            // Wait for queue to empty.
+            log.info("Waiting for empty queue...");
+
+            boolean failedWait = false;
+
+            if (!emptyLatch.await(AWAIT_TIMEOUT_SEC, TimeUnit.SECONDS)) {
+                info(">>> Failed to wait for queue to empty.");
+
+                failedWait = true;
+            }
+
+            if (!failedWait)
+                assertFalse("One or more jobs have failed.", jobFailed.get());
+
+            Collection<Integer> absentKeys = 
findAbsentKeys(runningWorkers.get(0), testKeys);
+
+            if (!failedWait && !absentKeys.isEmpty()) {
+                // Give some time to preloader.
+                U.sleep(15000);
+
+                absentKeys = findAbsentKeys(runningWorkers.get(0), testKeys);
+            }
+
+            info(">>> Absent keys: " + absentKeys);
+
+            assertTrue(absentKeys.isEmpty());
+
+            // Actual primary cache size.
+            int primaryCacheSize = 0;
+
+            for (Ignite g : runningWorkers) {
+                info(">>>>> " + g.cache(CACHE_NAME).size());
+
+                primaryCacheSize += g.cache(CACHE_NAME).primarySize();
+            }
+
+            assertEquals(TEST_MAP_SIZE, primaryCacheSize);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Tests putAll() method along with failover and cache backup.
+     *
+     * Checks that the resulting primary cache size is the same as
+     * expected.
+     *
+     * @param near Near enabled.
+     * @param workerCnt Worker count.
+     * @param shutdownCnt Shutdown count.
+     * @throws Exception If failed.
+     */
+    public void checkPutAllFailoverColocated(boolean near, int workerCnt, int 
shutdownCnt) throws Exception {
+        nearEnabled = near;
+        backups = shutdownCnt;
+
+        Collection<Integer> testKeys = generateTestKeys();
+
+        final Ignite master = startGrid(MASTER);
+
+        List<Ignite> workers = new ArrayList<>(workerCnt);
+
+        for (int i = 1; i <= workerCnt; i++)
+            workers.add(startGrid("worker" + i));
+
+        info("Master: " + master.cluster().localNode().id());
+
+        List<Ignite> runningWorkers = new ArrayList<>(workerCnt);
+
+        for (int i = 1; i <= workerCnt; i++) {
+            UUID id = workers.get(i - 1).cluster().localNode().id();
+
+            info(String.format("Worker%d: %s", i, id));
+
+            runningWorkers.add(workers.get(i - 1));
+        }
+
+        try {
+            // Dummy call to fetch affinity function from remote node
+            master.cluster().mapKeyToNode(CACHE_NAME, "Dummy");
+
+            Map<UUID, Collection<Integer>> dataChunks = new HashMap<>();
+
+            int chunkCntr = 0;
+            final AtomicBoolean jobFailed = new AtomicBoolean(false);
+
+            int failoverPushGap = 0;
+
+            final CountDownLatch emptyLatch = new CountDownLatch(1);
+
+            final AtomicBoolean inputExhausted = new AtomicBoolean();
+
+            IgniteCompute comp = 
compute(master.cluster().forPredicate(workerNodesFilter)).enableAsync();
+
+            for (Integer key : testKeys) {
+                ClusterNode mappedNode = 
master.cluster().mapKeyToNode(CACHE_NAME, key);
+
+                UUID nodeId = mappedNode.id();
+
+                Collection<Integer> data = dataChunks.get(nodeId);
+
+                if (data == null) {
+                    data = new ArrayList<>(DATA_CHUNK_SIZE);
+
+                    dataChunks.put(nodeId, data);
+                }
+
+                data.add(key);
+
+                if (data.size() == DATA_CHUNK_SIZE) { // time to send data
+                    chunkCntr++;
+
+                    log.info("Pushing data chunk [chunkNo=" + chunkCntr + "]");
+
+                    comp.execute(new GridCachePutAllTask(nodeId, CACHE_NAME), 
data);
+
+                    ComputeTaskFuture<Void> fut = comp.future();
+
+                    resQueue.put(fut); // Blocks if queue is full.
+
+                    fut.listenAsync(new CI1<IgniteFuture<Void>>() {
+                        @Override public void apply(IgniteFuture<Void> f) {
+                            ComputeTaskFuture<?> taskFut = 
(ComputeTaskFuture<?>)f;
+
+                            try {
+                                taskFut.get(); //if something went wrong - 
we'll get exception here
+                            }
+                            catch (IgniteCheckedException e) {
+                                log.error("Job failed", e);
+
+                                jobFailed.set(true);
+                            }
+
+                            // Remove complete future from queue to allow 
other jobs to proceed.
+                            resQueue.remove(taskFut);
+
+                            if (inputExhausted.get() && resQueue.isEmpty())
+                                emptyLatch.countDown();
+                        }
+                    });
+
+                    data = new ArrayList<>(DATA_CHUNK_SIZE);
+
+                    dataChunks.put(nodeId, data);
+
+                    if (chunkCntr >= FAIL_ON_CHUNK_NO) {
+                        if (workerCnt - runningWorkers.size() < shutdownCnt) {
+                            if (failoverPushGap > 0)
+                                failoverPushGap--;
+                            else {
+                                Ignite victim = runningWorkers.remove(0);
+
+                                info("Shutting down node: " + 
victim.cluster().localNode().id());
+
+                                stopGrid(victim.name());
+
+                                // Fail next node after some jobs have been 
pushed.
+                                failoverPushGap = FAILOVER_PUSH_GAP;
+                            }
+                        }
+                    }
+                }
+            }
+
+            for (Map.Entry<UUID, Collection<Integer>> entry : 
dataChunks.entrySet()) {
+                comp.execute(new GridCachePutAllTask(entry.getKey(), 
CACHE_NAME), entry.getValue());
+
+                ComputeTaskFuture<Void> fut = comp.future();
+
+                resQueue.put(fut); // Blocks if queue is full.
+
+                fut.listenAsync(new CI1<IgniteFuture<Void>>() {
+                    @Override public void apply(IgniteFuture<Void> f) {
+                        ComputeTaskFuture<?> taskFut = (ComputeTaskFuture<?>)f;
+
+                        try {
+                            taskFut.get(); //if something went wrong - we'll 
get exception here
+                        }
+                        catch (IgniteCheckedException e) {
+                            log.error("Job failed", e);
+
+                            jobFailed.set(true);
+                        }
+
+                        // Remove complete future from queue to allow other 
jobs to proceed.
+                        resQueue.remove(taskFut);
+
+                        if (inputExhausted.get() && resQueue.isEmpty())
+                            emptyLatch.countDown();
+                    }
+                });
+            }
+
+            inputExhausted.set(true);
+
+            if (resQueue.isEmpty())
+                emptyLatch.countDown();
+
+            // Wait for queue to empty.
+            log.info("Waiting for empty queue...");
+
+            boolean failedWait = false;
+
+            if (!emptyLatch.await(AWAIT_TIMEOUT_SEC, TimeUnit.SECONDS)) {
+                info(">>> Failed to wait for queue to empty.");
+
+                failedWait = true;
+            }
+
+            if (!failedWait)
+                assertFalse("One or more jobs have failed.", jobFailed.get());
+
+            Collection<Integer> absentKeys = 
findAbsentKeys(runningWorkers.get(0), testKeys);
+
+            if (!failedWait && !absentKeys.isEmpty()) {
+                // Give some time to preloader.
+                U.sleep(15000);
+
+                absentKeys = findAbsentKeys(runningWorkers.get(0), testKeys);
+            }
+
+            info(">>> Absent keys: " + absentKeys);
+
+            assertTrue(absentKeys.isEmpty());
+
+            // Actual primary cache size.
+            int primaryCacheSize = 0;
+
+            for (Ignite g : runningWorkers) {
+                info(">>>>> " + g.cache(CACHE_NAME).size());
+
+                primaryCacheSize += g.cache(CACHE_NAME).primarySize();
+            }
+
+            assertEquals(TEST_MAP_SIZE, primaryCacheSize);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Tries to find keys, that are absent in cache.
+     *
+     * @param workerNode Worker node.
+     * @param keys Keys that are suspected to be absent
+     * @return List of absent keys. If no keys are absent, the list is empty.
+     * @throws IgniteCheckedException If error occurs.
+     */
+    private Collection<Integer> findAbsentKeys(Ignite workerNode,
+        Collection<Integer> keys) throws IgniteCheckedException {
+
+        Collection<Integer> ret = new ArrayList<>(keys.size());
+
+        GridCache<Object, Object> cache = workerNode.cache(CACHE_NAME);
+
+        for (Integer key : keys) {
+            if (cache.get(key) == null) // Key is absent.
+                ret.add(key);
+        }
+
+        return ret;
+    }
+
+    /**
+     * Generates a test keys collection.
+     *
+     * @return A test keys collection.
+     */
+    private Collection<Integer> generateTestKeys() {
+        Collection<Integer> ret = new ArrayList<>(TEST_MAP_SIZE);
+
+        for (int i = 0; i < TEST_MAP_SIZE; i++)
+            ret.add(i);
+
+        return ret;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        cfg.setDeploymentMode(IgniteDeploymentMode.CONTINUOUS);
+
+        TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
+
+        discoverySpi.setAckTimeout(60000);
+        discoverySpi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(discoverySpi);
+
+        if (gridName.startsWith("master")) {
+            cfg.setUserAttributes(ImmutableMap.of("segment", "master"));
+
+            // For sure.
+            failoverSpi.setMaximumFailoverAttempts(50);
+
+            cfg.setFailoverSpi(failoverSpi);
+        }
+        else if (gridName.startsWith("worker")) {
+            cfg.setUserAttributes(ImmutableMap.of("segment", "worker"));
+
+            CacheConfiguration cacheCfg = defaultCacheConfiguration();
+            cacheCfg.setName("partitioned");
+            cacheCfg.setCacheMode(GridCacheMode.PARTITIONED);
+            cacheCfg.setStartSize(4500000);
+
+            cacheCfg.setBackups(backups);
+
+            cacheCfg.setStoreValueBytes(true);
+            cacheCfg.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : 
PARTITIONED_ONLY);
+            cacheCfg.setQueryIndexEnabled(false);
+
+            cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+
+            cfg.setCacheConfiguration(cacheCfg);
+        }
+        else
+            throw new IllegalStateException("Unexpected grid name: " + 
gridName);
+
+        return cfg;
+    }
+
+    /**
+     * Test failover SPI for master node.
+     */
+    @IgniteSpiConsistencyChecked(optional = true)
+    private static class MasterFailoverSpi extends AlwaysFailoverSpi {
+        /** */
+        private static final String FAILOVER_NUMBER_ATTR = 
"failover:number:attr";
+
+        /** */
+        private Set<ComputeJobContext> failedOverJobs = new HashSet<>();
+
+        /** Node filter. */
+        private IgnitePredicate<? super ClusterNode>[] filter;
+
+        /** */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /**
+         * @param filter Filter.
+         */
+        MasterFailoverSpi(IgnitePredicate<? super ClusterNode>... filter) {
+            this.filter = filter;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterNode failover(FailoverContext ctx, 
List<ClusterNode> top) {
+            failedOverJobs.add(ctx.getJobResult().getJobContext());
+
+            // Clear failed nodes list - allow to failover on the same node.
+            
ctx.getJobResult().getJobContext().setAttribute(FAILED_NODE_LIST_ATTR, null);
+
+            // Account for maximum number of failover attempts since we clear 
failed node list.
+            Integer failoverCnt = 
ctx.getJobResult().getJobContext().getAttribute(FAILOVER_NUMBER_ATTR);
+
+            if (failoverCnt == null)
+                
ctx.getJobResult().getJobContext().setAttribute(FAILOVER_NUMBER_ATTR, 1);
+            else {
+                if (failoverCnt >= getMaximumFailoverAttempts()) {
+                    U.warn(log, "Job failover failed because number of maximum 
failover attempts is exceeded " +
+                        "[failedJob=" + ctx.getJobResult().getJob() + ", 
maxFailoverAttempts=" +
+                        getMaximumFailoverAttempts() + ']');
+
+                    return null;
+                }
+
+                
ctx.getJobResult().getJobContext().setAttribute(FAILOVER_NUMBER_ATTR, 
failoverCnt + 1);
+            }
+
+            List<ClusterNode> cp = new ArrayList<>(top);
+
+            // Keep collection type.
+            F.retain(cp, false, new IgnitePredicate<ClusterNode>() {
+                @Override public boolean apply(ClusterNode node) {
+                    return F.isAll(node, filter);
+                }
+            });
+
+            return super.failover(ctx, cp); //use cp to ensure we don't 
failover on failed node
+        }
+
+        /**
+         * @return Job contexts for failed over jobs.
+         */
+        public Set<ComputeJobContext> getFailedOverJobs() {
+            return failedOverJobs;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllTask.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllTask.java
new file mode 100644
index 0000000..0bc4309
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllTask.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Puts all the passed data into partitioned cache in small chunks.
+ */
+class GridCachePutAllTask extends ComputeTaskAdapter<Collection<Integer>, 
Void> {
+    /** Number of entries per put. */
+    private static final int TX_BOUND = 30;
+
+    /** Preferred node. */
+    private final UUID preferredNode;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /**
+     *
+     * @param preferredNode A node that we'd prefer to take from grid.
+     * @param cacheName A name of the cache to work with.
+     */
+    GridCachePutAllTask(UUID preferredNode, String cacheName) {
+        this.preferredNode = preferredNode;
+        this.cacheName = cacheName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid,
+        @Nullable final Collection<Integer> data) throws 
IgniteCheckedException {
+        assert !subgrid.isEmpty();
+
+        // Give preference to wanted node. Otherwise, take the first one.
+        ClusterNode targetNode = F.find(subgrid, subgrid.get(0), new 
IgnitePredicate<ClusterNode>() {
+            /** {@inheritDoc} */
+            @Override public boolean apply(ClusterNode e) {
+                return preferredNode.equals(e.id());
+            }
+        });
+
+        return Collections.singletonMap(
+            new ComputeJobAdapter() {
+                @IgniteLoggerResource
+                private IgniteLogger log;
+
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public Object execute() throws 
IgniteCheckedException {
+                    log.info("Going to put data: " + data);
+
+                    GridCacheProjection<Object, Object> cache = 
ignite.cache(cacheName);
+
+                    assert cache != null;
+
+                    HashMap<Integer, Integer> putMap = 
U.newLinkedHashMap(TX_BOUND);
+
+                    Iterator<Integer> it = data.iterator();
+
+                    int cnt = 0;
+
+                    while (it.hasNext()) {
+                        Integer val = it.next();
+
+                        putMap.put(val, val);
+
+                        if (++cnt == TX_BOUND) {
+                            log.info("Putting keys to cache: " + 
putMap.keySet());
+
+                            cache.putAll(putMap);
+
+                            cnt = 0;
+
+                            putMap = U.newLinkedHashMap(TX_BOUND);
+                        }
+                    }
+
+                    assert cnt < TX_BOUND;
+                    assert putMap.size() == (data.size() % TX_BOUND) : 
"putMap.size() = " + putMap.size();
+
+                    log.info("Putting keys to cache: " + putMap.keySet());
+
+                    cache.putAll(putMap);
+
+                    log.info("Finished putting data: " + data);
+
+                    return data;
+                }
+            },
+            targetNode);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
List<ComputeJobResult> rcvd) throws IgniteCheckedException {
+        if (res.getException() != null)
+            return ComputeJobResultPolicy.FAILOVER;
+
+        return ComputeJobResultPolicy.WAIT;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Void reduce(List<ComputeJobResult> results) 
throws IgniteCheckedException {
+        return null;
+    }
+}

Reply via email to