http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java
new file mode 100644
index 0000000..fed2812
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.transactions.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.configuration.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+
+/**
+ * Basic store test.
+ */
+public abstract class GridCacheWriteBehindStoreAbstractTest extends 
GridCommonAbstractTest {
+    /** Flush frequency. */
+    private static final int WRITE_FROM_BEHIND_FLUSH_FREQUENCY = 1000;
+
+    /** Cache store. */
+    private static final GridCacheTestStore store = new GridCacheTestStore();
+
+    /**
+     *
+     */
+    protected GridCacheWriteBehindStoreAbstractTest() {
+        super(true /*start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        store.resetTimestamp();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        GridCache<?, ?> cache = cache();
+
+        if (cache != null)
+            cache.clearAll();
+
+        store.reset();
+    }
+
+    /** @return Caching mode. */
+    protected abstract GridCacheMode cacheMode();
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected final IgniteConfiguration getConfiguration() throws 
Exception {
+        IgniteConfiguration c = super.getConfiguration();
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        c.setDiscoverySpi(disco);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(cacheMode());
+        
cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cc.setSwapEnabled(false);
+        cc.setAtomicityMode(TRANSACTIONAL);
+        cc.setDistributionMode(NEAR_PARTITIONED);
+
+        cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
+        cc.setReadThrough(true);
+        cc.setWriteThrough(true);
+        cc.setLoadPreviousValue(true);
+
+        cc.setWriteBehindEnabled(true);
+        cc.setWriteBehindFlushFrequency(WRITE_FROM_BEHIND_FLUSH_FREQUENCY);
+
+        c.setCacheConfiguration(cc);
+
+        return c;
+    }
+
+    /** @throws Exception If test fails. */
+    public void testWriteThrough() throws Exception {
+        GridCache<Integer, String> cache = cache();
+
+        Map<Integer, String> map = store.getMap();
+
+        assert map.isEmpty();
+
+        IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ);
+
+        try {
+            for (int i = 1; i <= 10; i++) {
+                cache.putx(i, Integer.toString(i));
+
+                checkLastMethod(null);
+            }
+
+            tx.commit();
+        }
+        finally {
+            tx.close();
+        }
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        checkLastMethod("putAll");
+
+        assert cache.size() == 10;
+
+        for (int i = 1; i <= 10; i++) {
+            String val = map.get(i);
+
+            assert val != null;
+            assert val.equals(Integer.toString(i));
+        }
+
+        store.resetLastMethod();
+
+        tx = cache.txStart();
+
+        try {
+            for (int i = 1; i <= 10; i++) {
+                String val = cache.remove(i);
+
+                checkLastMethod(null);
+
+                assert val != null;
+                assert val.equals(Integer.toString(i));
+            }
+
+            tx.commit();
+        }
+        finally {
+            tx.close();
+        }
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        checkLastMethod("removeAll");
+
+        assert map.isEmpty();
+    }
+
+    /** @throws Exception If test failed. */
+    public void testReadThrough() throws Exception {
+        GridCache<Integer, String> cache = cache();
+
+        Map<Integer, String> map = store.getMap();
+
+        assert map.isEmpty();
+
+        try (IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ)) {
+            for (int i = 1; i <= 10; i++)
+                cache.putx(i, Integer.toString(i));
+
+            checkLastMethod(null);
+
+            tx.commit();
+        }
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        checkLastMethod("putAll");
+
+        for (int i = 1; i <= 10; i++) {
+            String val = map.get(i);
+
+            assert val != null;
+            assert val.equals(Integer.toString(i));
+        }
+
+        cache.clearAll();
+
+        assert cache.isEmpty();
+        assert cache.isEmpty();
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        assert map.size() == 10;
+
+        for (int i = 1; i <= 10; i++) {
+            // Read through.
+            String val = cache.get(i);
+
+            checkLastMethod("load");
+
+            assert val != null;
+            assert val.equals(Integer.toString(i));
+        }
+
+        assert cache.size() == 10;
+
+        cache.clearAll();
+
+        assert cache.isEmpty();
+        assert cache.isEmpty();
+
+        assert map.size() == 10;
+
+        Collection<Integer> keys = new ArrayList<>();
+
+        for (int i = 1; i <= 10; i++)
+            keys.add(i);
+
+        // Read through.
+        Map<Integer, String> vals = cache.getAll(keys);
+
+        checkLastMethod("loadAll");
+
+        assert vals != null;
+        assert vals.size() == 10;
+
+        for (int i = 1; i <= 10; i++) {
+            String val = vals.get(i);
+
+            assert val != null;
+            assert val.equals(Integer.toString(i));
+        }
+
+        // Write through.
+        cache.removeAll(keys);
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        checkLastMethod("removeAll");
+
+        assert cache.isEmpty();
+        assert cache.isEmpty();
+
+        assert map.isEmpty();
+    }
+
+    /** @throws Exception If failed. */
+    public void testMultithreaded() throws Exception {
+        final ConcurrentMap<String, Set<Integer>> perThread = new 
ConcurrentHashMap<>();
+
+        final AtomicBoolean running = new AtomicBoolean(true);
+
+        final GridCache<Integer, String> cache = cache();
+
+        IgniteFuture<?> fut = multithreadedAsync(new Runnable() {
+            @SuppressWarnings({"NullableProblems"})
+            @Override public void run() {
+                // Initialize key set for this thread.
+                Set<Integer> set = new HashSet<>();
+
+                Set<Integer> old = 
perThread.putIfAbsent(Thread.currentThread().getName(), set);
+
+                if (old != null)
+                    set = old;
+
+                Random rnd = new Random();
+
+                try {
+                    int keyCnt = 20000;
+
+                    while (running.get()) {
+                        int op = rnd.nextInt(2);
+                        int key = rnd.nextInt(keyCnt);
+
+                        switch (op) {
+                            case 0:
+                                cache.put(key, "val" + key);
+                                set.add(key);
+
+                                break;
+
+                            case 1:
+                            default:
+                                cache.remove(key);
+                                set.remove(key);
+
+                                break;
+                        }
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    error("Unexpected exception in put thread", e);
+
+                    assert false;
+                }
+            }
+        }, 10, "put");
+
+        U.sleep(10000);
+
+        running.set(false);
+
+        fut.get();
+
+        U.sleep(5 * WRITE_FROM_BEHIND_FLUSH_FREQUENCY);
+
+        Map<Integer, String> stored = store.getMap();
+
+        for (Map.Entry<Integer, String> entry : stored.entrySet()) {
+            int key = entry.getKey();
+
+            assertEquals("Invalid value for key " + key, "val" + key, 
entry.getValue());
+
+            boolean found = false;
+
+            for (Set<Integer> threadPuts : perThread.values()) {
+                if (threadPuts.contains(key)) {
+                    found = true;
+
+                    break;
+                }
+            }
+
+            assert found : "No threads found that put key " + key;
+        }
+    }
+
+    /** @param mtd Expected last method value. */
+    private void checkLastMethod(@Nullable String mtd) {
+        String lastMtd = store.getLastMethod();
+
+        if (mtd == null)
+            assert lastMtd == null : "Last method must be null: " + lastMtd;
+        else {
+            assert lastMtd != null : "Last method must be not null";
+            assert lastMtd.equals(mtd) : "Last method does not match 
[expected=" + mtd + ", lastMtd=" + lastMtd + ']';
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java
new file mode 100644
index 0000000..1cc35e4
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+
+/**
+ * Tests {@link GridCacheWriteBehindStore} in grid configuration.
+ */
+public class GridCacheWriteBehindStoreLocalTest extends 
GridCacheWriteBehindStoreAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return GridCacheMode.LOCAL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java
new file mode 100644
index 0000000..4289a03
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Multithreaded tests for {@link GridCacheWriteBehindStore}.
+ */
+public class GridCacheWriteBehindStoreMultithreadedSelfTest extends 
GridCacheWriteBehindStoreAbstractSelfTest {
+    /**
+     * This test performs complex set of operations on store from multiple 
threads.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPutGetRemove() throws Exception {
+        initStore(2);
+
+        Set<Integer> exp;
+
+        try {
+            exp = runPutGetRemoveMultithreaded(10, 10);
+        }
+        finally {
+            shutdownStore();
+        }
+
+        Map<Integer, String> map = delegate.getMap();
+
+        Collection<Integer> extra = new HashSet<>(map.keySet());
+
+        extra.removeAll(exp);
+
+        assertTrue("The underlying store contains extra keys: " + extra, 
extra.isEmpty());
+
+        Collection<Integer> missing = new HashSet<>(exp);
+
+        missing.removeAll(map.keySet());
+
+        assertTrue("Missing keys in the underlying store: " + missing, 
missing.isEmpty());
+
+        for (Integer key : exp)
+            assertEquals("Invalid value for key " + key, "val" + key, 
map.get(key));
+    }
+
+    /**
+     * Tests that cache would keep values if underlying store fails.
+     *
+     * @throws Exception If failed.
+     */
+    public void testStoreFailure() throws Exception {
+        delegate.setShouldFail(true);
+
+        initStore(2);
+
+        Set<Integer> exp;
+
+        try {
+            exp = runPutGetRemoveMultithreaded(10, 10);
+
+            U.sleep(FLUSH_FREQUENCY);
+
+            info(">>> There are " + store.getWriteBehindErrorRetryCount() + " 
entries in RETRY state");
+
+            delegate.setShouldFail(false);
+
+            // Despite that we set shouldFail flag to false, flush thread may 
just have caught an exception.
+            // If we move store to the stopping state right away, this value 
will be lost. That's why this sleep
+            // is inserted here to let all exception handlers in write-behind 
store exit.
+            U.sleep(1000);
+        }
+        finally {
+            shutdownStore();
+        }
+
+        Map<Integer, String> map = delegate.getMap();
+
+        Collection<Integer> extra = new HashSet<>(map.keySet());
+
+        extra.removeAll(exp);
+
+        assertTrue("The underlying store contains extra keys: " + extra, 
extra.isEmpty());
+
+        Collection<Integer> missing = new HashSet<>(exp);
+
+        missing.removeAll(map.keySet());
+
+        assertTrue("Missing keys in the underlying store: " + missing, 
missing.isEmpty());
+
+        for (Integer key : exp)
+            assertEquals("Invalid value for key " + key, "val" + key, 
map.get(key));
+    }
+
+    /**
+     * Tests store consistency in case of high put rate, when flush is 
performed from the same thread
+     * as put or remove operation.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFlushFromTheSameThread() throws Exception {
+        // 50 milliseconds should be enough.
+        delegate.setOperationDelay(50);
+
+        initStore(2);
+
+        Set<Integer> exp;
+
+        int start = store.getWriteBehindTotalCriticalOverflowCount();
+
+        try {
+            //We will have in total 5 * CACHE_SIZE keys that should be enough 
to grow map size to critical value.
+            exp = runPutGetRemoveMultithreaded(5, CACHE_SIZE);
+        }
+        finally {
+            log.info(">>> Done inserting, shutting down the store");
+
+            shutdownStore();
+        }
+
+        // Restore delay.
+        delegate.setOperationDelay(0);
+
+        Map<Integer, String> map = delegate.getMap();
+
+        int end = store.getWriteBehindTotalCriticalOverflowCount();
+
+        log.info(">>> There are " + exp.size() + " keys in store, " + (end - 
start) + " overflows detected");
+
+        assertTrue("No cache overflows detected (a bug or too few keys or too 
few delay?)", end > start);
+
+        Collection<Integer> extra = new HashSet<>(map.keySet());
+
+        extra.removeAll(exp);
+
+        assertTrue("The underlying store contains extra keys: " + extra, 
extra.isEmpty());
+
+        Collection<Integer> missing = new HashSet<>(exp);
+
+        missing.removeAll(map.keySet());
+
+        assertTrue("Missing keys in the underlying store: " + missing, 
missing.isEmpty());
+
+        for (Integer key : exp)
+            assertEquals("Invalid value for key " + key, "val" + key, 
map.get(key));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
new file mode 100644
index 0000000..b31c58f
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.transactions.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.configuration.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+
+/**
+ * Tests write-behind store with near and dht commit option.
+ */
+public class GridCacheWriteBehindStorePartitionedMultiNodeSelfTest extends 
GridCommonAbstractTest {
+    /** Grids to start. */
+    private static final int GRID_CNT = 5;
+
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Flush frequency. */
+    public static final int WRITE_BEHIND_FLUSH_FREQ = 1000;
+
+    /** Stores per grid. */
+    private GridCacheTestStore[] stores = new GridCacheTestStore[GRID_CNT];
+
+    /** Start grid counter. */
+    private int idx;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(GridCacheMode.PARTITIONED);
+        cc.setWriteBehindEnabled(true);
+        cc.setWriteBehindFlushFrequency(WRITE_BEHIND_FLUSH_FREQ);
+        cc.setAtomicityMode(TRANSACTIONAL);
+        cc.setDistributionMode(NEAR_PARTITIONED);
+
+        CacheStore store = stores[idx] = new GridCacheTestStore();
+
+        cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
+        cc.setReadThrough(true);
+        cc.setWriteThrough(true);
+        cc.setLoadPreviousValue(true);
+
+        c.setCacheConfiguration(cc);
+
+        idx++;
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stores = null;
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void prepare() throws Exception {
+        idx = 0;
+
+        startGrids(GRID_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleWritesOnDhtNode() throws Exception {
+        checkSingleWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBatchWritesOnDhtNode() throws Exception {
+        checkBatchWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxWritesOnDhtNode() throws Exception {
+        checkTxWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkSingleWrites() throws Exception {
+        prepare();
+
+        GridCache<Integer, String> cache = grid(0).cache(null);
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, String.valueOf(i));
+
+        checkWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkBatchWrites() throws Exception {
+        prepare();
+
+        Map<Integer, String> map = new HashMap<>();
+
+        for (int i = 0; i < 100; i++)
+            map.put(i, String.valueOf(i));
+
+        grid(0).cache(null).putAll(map);
+
+        checkWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkTxWrites() throws Exception {
+        prepare();
+
+        GridCache<Object, Object> cache = grid(0).cache(null);
+
+        try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int i = 0; i < 100; i++)
+                cache.put(i, String.valueOf(i));
+
+            tx.commit();
+        }
+
+        checkWrites();
+    }
+
+    /**
+     * @throws IgniteInterruptedException If sleep was interrupted.
+     */
+    private void checkWrites() throws IgniteInterruptedException {
+        U.sleep(WRITE_BEHIND_FLUSH_FREQ * 2);
+
+        Collection<Integer> allKeys = new ArrayList<>(100);
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            Map<Integer,String> map = stores[i].getMap();
+
+            assertFalse("Missing writes for node: " + i, map.isEmpty());
+
+            allKeys.addAll(map.keySet());
+
+            // Check there is no intersection.
+            for (int j = 0; j < GRID_CNT; j++) {
+                if (i == j)
+                    continue;
+
+                Collection<Integer> intersection = new 
HashSet<>(stores[j].getMap().keySet());
+
+                intersection.retainAll(map.keySet());
+
+                assertTrue(intersection.isEmpty());
+            }
+        }
+
+        assertEquals(100, allKeys.size());
+
+        for (int i = 0; i < 100; i++)
+            assertTrue(allKeys.contains(i));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java
new file mode 100644
index 0000000..2a7599b
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+
+/**
+ * Tests {@link GridCacheWriteBehindStore} in partitioned configuration.
+ */
+public class GridCacheWriteBehindStorePartitionedTest extends 
GridCacheWriteBehindStoreAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return GridCacheMode.PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java
new file mode 100644
index 0000000..1ea5d63
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+
+/**
+ * Tests {@link GridCacheWriteBehindStore} in grid configuration.
+ */
+public class GridCacheWriteBehindStoreReplicatedTest extends 
GridCacheWriteBehindStoreAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return GridCacheMode.REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java
new file mode 100644
index 0000000..65eaa81
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jdk8.backport.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * This class provides basic tests for {@link GridCacheWriteBehindStore}.
+ */
+public class GridCacheWriteBehindStoreSelfTest extends 
GridCacheWriteBehindStoreAbstractSelfTest {
+    /**
+     * Tests correct store shutdown when underlying store fails,
+     *
+     * @throws Exception If failed.
+     */
+    public void testShutdownWithFailure() throws Exception {
+        final AtomicReference<Exception> err = new AtomicReference<>();
+
+        multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    delegate.setShouldFail(true);
+
+                    initStore(2);
+
+                    try {
+                        store.write(new CacheEntryImpl<>(1, "val1"));
+                        store.write(new CacheEntryImpl<>(2, "val2"));
+                    }
+                    finally {
+                        shutdownStore();
+
+                        delegate.setShouldFail(false);
+                    }
+                }
+                catch (Exception e) {
+                    err.set(e);
+                }
+            }
+        }, 1).get();
+
+        if (err.get() != null)
+            throw err.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimpleStore() throws Exception {
+        initStore(2);
+
+        try {
+            store.write(new CacheEntryImpl<>(1, "v1"));
+            store.write(new CacheEntryImpl<>(2, "v2"));
+
+            assertEquals("v1", store.load(1));
+            assertEquals("v2", store.load(2));
+            assertNull(store.load(3));
+
+            store.delete(1);
+
+            assertNull(store.load(1));
+            assertEquals("v2", store.load(2));
+            assertNull(store.load(3));
+        }
+        finally {
+            shutdownStore();
+        }
+    }
+
+    /**
+     * Check that all values written to the store will be in underlying store 
after timeout or due to size limits.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"NullableProblems"})
+    public void testValuePropagation() throws Exception {
+        // Need to test size-based write.
+        initStore(1);
+
+        try {
+            for (int i = 0; i < CACHE_SIZE * 2; i++)
+                store.write(new CacheEntryImpl<>(i, "val" + i));
+
+            U.sleep(200);
+
+            for (int i = 0; i < CACHE_SIZE; i++) {
+                String val = delegate.load(i);
+
+                assertNotNull("Value for [key= " + i + "] was not written in 
store", val);
+                assertEquals("Invalid value [key=" + i + "]", "val" + i, val);
+            }
+
+            U.sleep(FLUSH_FREQUENCY + 300);
+
+            for (int i = CACHE_SIZE; i < CACHE_SIZE * 2; i++) {
+                String val = delegate.load(i);
+
+                assertNotNull("Value for [key= " + i + "] was not written in 
store", val);
+                assertEquals("Invalid value [key=" + i + "]", "val" + i, val);
+            }
+        }
+        finally {
+            shutdownStore();
+        }
+    }
+
+    /**
+     * Tests store behaviour under continuous put of the same key with 
different values.
+     *
+     * @throws Exception If failed
+     */
+    public void testContinuousPut() throws Exception {
+        initStore(2);
+
+        try {
+            final AtomicBoolean running = new AtomicBoolean(true);
+
+            final AtomicInteger actualPutCnt = new AtomicInteger();
+
+            IgniteFuture<?> fut = multithreadedAsync(new Runnable() {
+                @SuppressWarnings({"NullableProblems"})
+                @Override public void run() {
+                    try {
+                        while (running.get()) {
+                            for (int i = 0; i < CACHE_SIZE; i++) {
+                                store.write(new CacheEntryImpl<>(i, "val-0"));
+
+                                actualPutCnt.incrementAndGet();
+
+                                store.write(new CacheEntryImpl<>(i, "val" + 
i));
+
+                                actualPutCnt.incrementAndGet();
+                            }
+                        }
+                    }
+                    catch (Exception e) {
+                        error("Unexpected exception in put thread", e);
+
+                        assert false;
+                    }
+                }
+            }, 1, "put");
+
+            U.sleep(FLUSH_FREQUENCY * 2 + 500);
+
+            int delegatePutCnt = delegate.getPutAllCount();
+
+            running.set(false);
+
+            fut.get();
+
+            log().info(">>> [putCnt = " + actualPutCnt.get() + ", 
delegatePutCnt=" + delegatePutCnt + "]");
+
+            assertTrue("No puts were made to the underlying store", 
delegatePutCnt > 0);
+            assertTrue("Too many puts were made to the underlying store", 
delegatePutCnt < actualPutCnt.get() / 10);
+        }
+        finally {
+            shutdownStore();
+        }
+
+        // These checks must be done after the store shut down
+        assertEquals("Invalid store size", CACHE_SIZE, 
delegate.getMap().size());
+
+        for (int i = 0; i < CACHE_SIZE; i++)
+            assertEquals("Invalid value stored", "val" + i, 
delegate.getMap().get(i));
+    }
+
+    /**
+     * Tests that all values were put into the store will be written to the 
underlying store
+     * after shutdown is called.
+     *
+     * @throws Exception If failed.
+     */
+    public void testShutdown() throws Exception {
+        initStore(2);
+
+        try {
+            final AtomicBoolean running = new AtomicBoolean(true);
+
+            IgniteFuture<?> fut = multithreadedAsync(new Runnable() {
+                @SuppressWarnings({"NullableProblems"})
+                @Override public void run() {
+                    try {
+                        while (running.get()) {
+                            for (int i = 0; i < CACHE_SIZE; i++) {
+                                store.write(new CacheEntryImpl<>(i, "val-0"));
+
+                                store.write(new CacheEntryImpl<>(i, "val" + 
i));
+                            }
+                        }
+                    }
+                    catch (Exception e) {
+                        error("Unexpected exception in put thread", e);
+
+                        assert false;
+                    }
+                }
+            }, 1, "put");
+
+            U.sleep(300);
+
+            running.set(false);
+
+            fut.get();
+        }
+        finally {
+            shutdownStore();
+        }
+
+        // These checks must be done after the store shut down
+        assertEquals("Invalid store size", CACHE_SIZE, 
delegate.getMap().size());
+
+        for (int i = 0; i < CACHE_SIZE; i++)
+            assertEquals("Invalid value stored", "val" + i, 
delegate.getMap().get(i));
+    }
+
+    /**
+     * Tests that all values will be written to the underlying store
+     * right in the same order as they were put into the store.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBatchApply() throws Exception {
+        delegate = new GridCacheTestStore(new ConcurrentLinkedHashMap<Integer, 
String>());
+
+        initStore(1);
+
+        List<Integer> intList = new ArrayList<>(CACHE_SIZE);
+
+        try {
+            for (int i = 0; i < CACHE_SIZE; i++) {
+                store.write(new CacheEntryImpl<>(i, "val" + i));
+
+                intList.add(i);
+            }
+        }
+        finally {
+            shutdownStore();
+        }
+
+        Map<Integer, String> underlyingMap = delegate.getMap();
+
+        assertTrue("Store map key set: " + underlyingMap.keySet(), 
F.eqOrdered(underlyingMap.keySet(), intList));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java
new file mode 100644
index 0000000..b0286b6
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.transactions.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+
+/**
+ * Tests for local transactions.
+ */
+@SuppressWarnings( {"BusyWait"})
+abstract class IgniteTxAbstractTest extends GridCommonAbstractTest {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Execution count. */
+    private static final AtomicInteger cntr = new AtomicInteger();
+
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /**
+     * Start grid by default.
+     */
+    protected IgniteTxAbstractTest() {
+        super(false /*start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        return c;
+    }
+
+    /**
+     * @return Grid count.
+     */
+    protected abstract int gridCount();
+
+    /**
+     * @return Key count.
+     */
+    protected abstract int keyCount();
+
+    /**
+     * @return Maximum key value.
+     */
+    protected abstract int maxKeyValue();
+
+    /**
+     * @return Thread iterations.
+     */
+    protected abstract int iterations();
+
+    /**
+     * @return True if in-test logging is enabled.
+     */
+    protected abstract boolean isTestDebug();
+
+    /**
+     * @return {@code True} if memory stats should be printed.
+     */
+    protected abstract boolean printMemoryStats();
+
+    /** {@inheritDoc} */
+    private void debug(String msg) {
+        if (isTestDebug())
+            info(msg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 0; i < gridCount(); i++)
+            startGrid(i);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @param i Grid index.
+     * @return Cache.
+     */
+    @SuppressWarnings("unchecked")
+    @Override protected GridCache<Integer, String> cache(int i) {
+        return grid(i).cache(null);
+    }
+
+    /**
+     * @return Keys.
+     */
+    protected Iterable<Integer> getKeys() {
+        List<Integer> keys = new ArrayList<>(keyCount());
+
+        for (int i = 0; i < keyCount(); i++)
+            keys.add(RAND.nextInt(maxKeyValue()) + 1);
+
+        Collections.sort(keys);
+
+        return Collections.unmodifiableList(keys);
+    }
+
+    /**
+     * @return Random cache operation.
+     */
+    protected OP getOp() {
+        switch (RAND.nextInt(3)) {
+            case 0: { return OP.READ; }
+            case 1: { return OP.WRITE; }
+            case 2: { return OP.REMOVE; }
+
+            // Should never be reached.
+            default: { assert false; return null; }
+        }
+    }
+
+    /**
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @throws Exception If check failed.
+     */
+    protected void checkCommit(IgniteTxConcurrency concurrency, 
IgniteTxIsolation isolation) throws Exception {
+        int gridIdx = RAND.nextInt(gridCount());
+
+        Ignite ignite = grid(gridIdx);
+
+        if (isTestDebug())
+            debug("Checking commit on grid: " + 
ignite.cluster().localNode().id());
+
+        for (int i = 0; i < iterations(); i++) {
+            GridCache<Integer, String> cache = cache(gridIdx);
+
+            IgniteTx tx = cache.txStart(concurrency, isolation, 0, 0);
+
+            try {
+                int prevKey = -1;
+
+                for (Integer key : getKeys()) {
+                    // Make sure we have the same locking order for all 
concurrent transactions.
+                    assert key >= prevKey : "key: " + key + ", prevKey: " + 
prevKey;
+
+                    if (isTestDebug()) {
+                        GridCacheAffinityFunction aff = 
cache.configuration().getAffinity();
+
+                        int part = aff.partition(key);
+
+                        debug("Key affinity [key=" + key + ", partition=" + 
part + ", affinity=" +
+                            
U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']');
+                    }
+
+                    String val = Integer.toString(key);
+
+                    switch (getOp()) {
+                        case READ: {
+                            if (isTestDebug())
+                                debug("Reading key [key=" + key + ", i=" + i + 
']');
+
+                            val = cache.get(key);
+
+                            if (isTestDebug())
+                                debug("Read value for key [key=" + key + ", 
val=" + val + ']');
+
+                            break;
+                        }
+
+                        case WRITE: {
+                            if (isTestDebug())
+                                debug("Writing key and value [key=" + key + ", 
val=" + val + ", i=" + i + ']');
+
+                            cache.put(key, val);
+
+                            break;
+                        }
+
+                        case REMOVE: {
+                            if (isTestDebug())
+                                debug("Removing key [key=" + key + ", i=" + i  
+ ']');
+
+                            cache.remove(key);
+
+                            break;
+                        }
+
+                        default: { assert false; }
+                    }
+                }
+
+                tx.commit();
+
+                if (isTestDebug())
+                    debug("Committed transaction [i=" + i + ", tx=" + tx + 
']');
+            }
+            catch (IgniteTxOptimisticException e) {
+                if (concurrency != OPTIMISTIC || isolation != SERIALIZABLE) {
+                    error("Received invalid optimistic failure.", e);
+
+                    throw e;
+                }
+
+                if (isTestDebug())
+                    info("Optimistic transaction failure (will rollback) [i=" 
+ i + ", msg=" + e.getMessage() +
+                        ", tx=" + tx.xid() + ']');
+
+                try {
+                    tx.rollback();
+                }
+                catch (IgniteCheckedException ex) {
+                    error("Failed to rollback optimistic failure: " + tx, ex);
+
+                    throw ex;
+                }
+            }
+            catch (Exception e) {
+                error("Transaction failed (will rollback): " + tx, e);
+
+                tx.rollback();
+
+                throw e;
+            }
+            catch (Error e) {
+                error("Error when executing transaction (will rollback): " + 
tx, e);
+
+                tx.rollback();
+
+                throw e;
+            }
+            finally {
+                IgniteTx t = cache.tx();
+
+                assert t == null : "Thread should not have transaction upon 
completion ['t==tx'=" + (t == tx) +
+                    ", t=" + t + (t != tx ? "tx=" + tx : "tx=''") + ']';
+            }
+        }
+
+        if (printMemoryStats()) {
+            if (cntr.getAndIncrement() % 100 == 0)
+                // Print transaction memory stats.
+                
((GridKernal)grid(gridIdx)).internalCache().context().tm().printMemoryStats();
+        }
+    }
+
+    /**
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @throws IgniteCheckedException If check failed.
+     */
+    protected void checkRollback(IgniteTxConcurrency concurrency, 
IgniteTxIsolation isolation)
+        throws Exception {
+        checkRollback(new ConcurrentHashMap<Integer, String>(), concurrency, 
isolation);
+    }
+
+    /**
+     * @param map Map to check.
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @throws IgniteCheckedException If check failed.
+     */
+    protected void checkRollback(ConcurrentMap<Integer, String> map, 
IgniteTxConcurrency concurrency,
+        IgniteTxIsolation isolation) throws Exception {
+        int gridIdx = RAND.nextInt(gridCount());
+
+        Ignite ignite = grid(gridIdx);
+
+        if (isTestDebug())
+            debug("Checking commit on grid: " + 
ignite.cluster().localNode().id());
+
+        for (int i = 0; i < iterations(); i++) {
+            GridCache<Integer, String> cache = cache(gridIdx);
+
+            IgniteTx tx = cache.txStart(concurrency, isolation, 0, 0);
+
+            try {
+                for (Integer key : getKeys()) {
+                    if (isTestDebug()) {
+                        GridCacheAffinityFunction aff = 
cache.configuration().getAffinity();
+
+                        int part = aff.partition(key);
+
+                        debug("Key affinity [key=" + key + ", partition=" + 
part + ", affinity=" +
+                            
U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']');
+                    }
+
+                    String val = Integer.toString(key);
+
+                    switch (getOp()) {
+                        case READ: {
+                            debug("Reading key: " + key);
+
+                            checkMap(map, key, cache.get(key));
+
+                            break;
+                        }
+
+                        case WRITE: {
+                            debug("Writing key and value [key=" + key + ", 
val=" + val + ']');
+
+                            checkMap(map, key, cache.put(key, val));
+
+                            break;
+                        }
+
+                        case REMOVE: {
+                            debug("Removing key: " + key);
+
+                            checkMap(map, key, cache.remove(key));
+
+                            break;
+                        }
+
+                        default: { assert false; }
+                    }
+                }
+
+                tx.rollback();
+
+                debug("Rolled back transaction: " + tx);
+            }
+            catch (IgniteTxOptimisticException e) {
+                tx.rollback();
+
+                log.warning("Rolled back transaction due to optimistic 
exception [tx=" + tx + ", e=" + e + ']');
+
+                throw e;
+            }
+            catch (Exception e) {
+                tx.rollback();
+
+                error("Rolled back transaction due to exception [tx=" + tx + 
", e=" + e + ']');
+
+                throw e;
+            }
+            finally {
+                IgniteTx t1 = cache.tx();
+
+                debug("t1=" + t1);
+
+                assert t1 == null : "Thread should not have transaction upon 
completion ['t==tx'=" + (t1 == tx) +
+                    ", t=" + t1 + ']';
+            }
+        }
+    }
+
+    /**
+     * @param map Map to check against.
+     * @param key Key.
+     * @param val Value.
+     */
+    private void checkMap(ConcurrentMap<Integer, String> map, Integer key, 
String val) {
+        if (val != null) {
+            String v = map.putIfAbsent(key, val);
+
+            assert v == null || v.equals(val);
+        }
+    }
+
+    /**
+     * Checks integrity of all caches after tests.
+     *
+     * @throws IgniteCheckedException If check failed.
+     */
+    @SuppressWarnings({"ErrorNotRethrown"})
+    protected void finalChecks() throws Exception {
+        for (int i = 1; i <= maxKeyValue(); i++) {
+            for (int k = 0; k < 3; k++) {
+                try {
+                    GridCacheEntry<Integer, String> e1 = null;
+
+                    String v1 = null;
+
+                    for (int j = 0; j < gridCount(); j++) {
+                        GridCache<Integer, String> cache = cache(j);
+
+                        IgniteTx tx = cache.tx();
+
+                        assertNull("Transaction is not completed: " + tx, tx);
+
+                        if (j == 0) {
+                            e1 = cache.entry(i);
+
+                            v1 = e1.get();
+                        }
+                        else {
+                            GridCacheEntry<Integer, String> e2 = 
cache.entry(i);
+
+                            String v2 = e2.get();
+
+                            if (!F.eq(v2, v1)) {
+                                v1 = e1.get();
+                                v2 = e2.get();
+                            }
+
+                            assert F.eq(v2, v1) :
+                                "Invalid cached value [key=" + i + ", v1=" + 
v1 + ", v2=" + v2 + ", e1=" + e1 +
+                                    ", e2=" + e2 + ", grid=" + j + ']';
+                        }
+                    }
+
+                    break;
+                }
+                catch (AssertionError e) {
+                    if (k == 2)
+                        throw e;
+                    else
+                        // Wait for transactions to complete.
+                        Thread.sleep(500);
+                }
+            }
+        }
+
+        for (int i = 1; i <= maxKeyValue(); i++) {
+            for (int k = 0; k < 3; k++) {
+                try {
+                    for (int j = 0; j < gridCount(); j++) {
+                        GridCacheProjection<Integer, String> cache = cache(j);
+
+                        cache.removeAll();
+
+//                        assert cache.keySet().isEmpty() : "Cache is not 
empty: " + cache.entrySet();
+                    }
+
+                    break;
+                }
+                catch (AssertionError e) {
+                    if (k == 2)
+                        throw e;
+                    else
+                        // Wait for transactions to complete.
+                        Thread.sleep(500);
+                }
+            }
+        }
+    }
+
+    /**
+     * Cache operation.
+     */
+    protected enum OP {
+        /** Cache read. */
+        READ,
+
+        /** Cache write. */
+        WRITE,
+
+        /** Cache remove. */
+        REMOVE
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConcurrentGetAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConcurrentGetAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConcurrentGetAbstractTest.java
new file mode 100644
index 0000000..ee07f72
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConcurrentGetAbstractTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.transactions.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+
+/**
+ * Checks multithreaded put/get cache operations on one node.
+ */
+public abstract class IgniteTxConcurrentGetAbstractTest extends 
GridCommonAbstractTest {
+    /** Debug flag. */
+    private static final boolean DEBUG = false;
+
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int THREAD_NUM = 20;
+
+    /**
+     * Default constructor.
+     *
+     */
+    protected IgniteTxConcurrentGetAbstractTest() {
+        super(true /** Start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+
+        return cfg;
+    }
+
+    /**
+     * @param g Grid.
+     * @return Near cache.
+     */
+    GridNearCacheAdapter<String, Integer> near(Ignite g) {
+        return (GridNearCacheAdapter<String, Integer>)((GridKernal)g).<String, 
Integer>internalCache();
+    }
+
+    /**
+     * @param g Grid.
+     * @return DHT cache.
+     */
+    GridDhtCacheAdapter<String, Integer> dht(Ignite g) {
+        return near(g).dht();
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPutGet() throws Exception {
+        // Random key.
+        final String key = UUID.randomUUID().toString();
+
+        final Ignite ignite = grid();
+
+        ignite.cache(null).put(key, "val");
+
+        GridCacheEntryEx<String,Integer> dhtEntry = dht(ignite).peekEx(key);
+
+        if (DEBUG)
+            info("DHT entry [hash=" + System.identityHashCode(dhtEntry) + ", 
entry=" + dhtEntry + ']');
+
+        String val = txGet(ignite, key);
+
+        assertNotNull(val);
+
+        info("Starting threads: " + THREAD_NUM);
+
+        multithreaded(new Callable<String>() {
+            @Override public String call() throws Exception {
+                return txGet(ignite, key);
+            }
+        }, THREAD_NUM, "getter-thread");
+    }
+
+    /**
+     * @param ignite Grid.
+     * @param key Key.
+     * @return Value.
+     * @throws Exception If failed.
+     */
+    private String txGet(Ignite ignite, String key) throws Exception {
+        try (IgniteTx tx = ignite.cache(null).txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+            GridCacheEntryEx<String, Integer> dhtEntry = 
dht(ignite).peekEx(key);
+
+            if (DEBUG)
+                info("DHT entry [hash=" + System.identityHashCode(dhtEntry) + 
", xid=" + tx.xid() +
+                    ", entry=" + dhtEntry + ']');
+
+            String val = ignite.<String, String>cache(null).get(key);
+
+            assertNotNull(val);
+            assertEquals("val", val);
+
+            tx.commit();
+
+            return val;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
new file mode 100644
index 0000000..126da68
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.indexing.*;
+import org.apache.ignite.transactions.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.testframework.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.processor.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+
+/**
+ * Tests that transaction is invalidated in case of {@link 
IgniteTxHeuristicException}.
+ */
+public abstract class IgniteTxExceptionAbstractSelfTest extends 
GridCacheAbstractSelfTest {
+    /** Index SPI throwing exception. */
+    private static TestIndexingSpi idxSpi = new TestIndexingSpi();
+
+    /** */
+    private static final int PRIMARY = 0;
+
+    /** */
+    private static final int BACKUP = 1;
+
+    /** */
+    private static final int NOT_PRIMARY_AND_BACKUP = 2;
+
+    /** */
+    private static Integer lastKey;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setIndexingSpi(idxSpi);
+
+        cfg.getTransactionsConfiguration().setTxSerializableEnabled(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) 
throws Exception {
+        CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+        ccfg.setQueryIndexEnabled(true);
+        ccfg.setCacheStoreFactory(null);
+        ccfg.setReadThrough(false);
+        ccfg.setWriteThrough(false);
+        ccfg.setLoadPreviousValue(true);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        lastKey = 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        idxSpi.forceFail(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutNear() throws Exception {
+        checkPut(true, keyForNode(grid(0).localNode(), 
NOT_PRIMARY_AND_BACKUP));
+
+        checkPut(false, keyForNode(grid(0).localNode(), 
NOT_PRIMARY_AND_BACKUP));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutPrimary() throws Exception {
+        checkPut(true, keyForNode(grid(0).localNode(), PRIMARY));
+
+        checkPut(false, keyForNode(grid(0).localNode(), PRIMARY));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutBackup() throws Exception {
+        checkPut(true, keyForNode(grid(0).localNode(), BACKUP));
+
+        checkPut(false, keyForNode(grid(0).localNode(), BACKUP));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAll() throws Exception {
+        checkPutAll(true, keyForNode(grid(0).localNode(), PRIMARY),
+            keyForNode(grid(0).localNode(), PRIMARY),
+            keyForNode(grid(0).localNode(), PRIMARY));
+
+        checkPutAll(false, keyForNode(grid(0).localNode(), PRIMARY),
+            keyForNode(grid(0).localNode(), PRIMARY),
+            keyForNode(grid(0).localNode(), PRIMARY));
+
+        if (gridCount() > 1) {
+            checkPutAll(true, keyForNode(grid(1).localNode(), PRIMARY),
+                keyForNode(grid(1).localNode(), PRIMARY),
+                keyForNode(grid(1).localNode(), PRIMARY));
+
+            checkPutAll(false, keyForNode(grid(1).localNode(), PRIMARY),
+                keyForNode(grid(1).localNode(), PRIMARY),
+                keyForNode(grid(1).localNode(), PRIMARY));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoveNear() throws Exception {
+        checkRemove(false, keyForNode(grid(0).localNode(), 
NOT_PRIMARY_AND_BACKUP));
+
+        checkRemove(true, keyForNode(grid(0).localNode(), 
NOT_PRIMARY_AND_BACKUP));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemovePrimary() throws Exception {
+        checkRemove(false, keyForNode(grid(0).localNode(), PRIMARY));
+
+        checkRemove(true, keyForNode(grid(0).localNode(), PRIMARY));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoveBackup() throws Exception {
+        checkRemove(false, keyForNode(grid(0).localNode(), BACKUP));
+
+        checkRemove(true, keyForNode(grid(0).localNode(), BACKUP));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformNear() throws Exception {
+        checkTransform(false, keyForNode(grid(0).localNode(), 
NOT_PRIMARY_AND_BACKUP));
+
+        checkTransform(true, keyForNode(grid(0).localNode(), 
NOT_PRIMARY_AND_BACKUP));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformPrimary() throws Exception {
+        checkTransform(false, keyForNode(grid(0).localNode(), PRIMARY));
+
+        checkTransform(true, keyForNode(grid(0).localNode(), PRIMARY));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformBackup() throws Exception {
+        checkTransform(false, keyForNode(grid(0).localNode(), BACKUP));
+
+        checkTransform(true, keyForNode(grid(0).localNode(), BACKUP));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutNearTx() throws Exception {
+        for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) {
+            for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) {
+                checkPutTx(true, concurrency, isolation, 
keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+
+                checkPutTx(false, concurrency, isolation, 
keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutPrimaryTx() throws Exception {
+        for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) {
+            for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) {
+                checkPutTx(true, concurrency, isolation, 
keyForNode(grid(0).localNode(), PRIMARY));
+
+                checkPutTx(false, concurrency, isolation, 
keyForNode(grid(0).localNode(), PRIMARY));
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutBackupTx() throws Exception {
+        for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) {
+            for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) {
+                checkPutTx(true, concurrency, isolation, 
keyForNode(grid(0).localNode(), BACKUP));
+
+                checkPutTx(false, concurrency, isolation, 
keyForNode(grid(0).localNode(), BACKUP));
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutMultipleKeysTx() throws Exception {
+        for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) {
+            for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) {
+                checkPutTx(true, concurrency, isolation,
+                    keyForNode(grid(0).localNode(), PRIMARY),
+                    keyForNode(grid(0).localNode(), PRIMARY),
+                    keyForNode(grid(0).localNode(), PRIMARY));
+
+                checkPutTx(false, concurrency, isolation,
+                    keyForNode(grid(0).localNode(), PRIMARY),
+                    keyForNode(grid(0).localNode(), PRIMARY),
+                    keyForNode(grid(0).localNode(), PRIMARY));
+
+                if (gridCount() > 1) {
+                    checkPutTx(true, concurrency, isolation,
+                        keyForNode(grid(1).localNode(), PRIMARY),
+                        keyForNode(grid(1).localNode(), PRIMARY),
+                        keyForNode(grid(1).localNode(), PRIMARY));
+
+                    checkPutTx(false, concurrency, isolation,
+                        keyForNode(grid(1).localNode(), PRIMARY),
+                        keyForNode(grid(1).localNode(), PRIMARY),
+                        keyForNode(grid(1).localNode(), PRIMARY));
+                }
+            }
+        }
+    }
+
+    /**
+     * @param putBefore If {@code true} then puts some value before executing 
failing operation.
+     * @param keys Keys.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void checkPutTx(boolean putBefore, IgniteTxConcurrency concurrency,
+        IgniteTxIsolation isolation, final Integer... keys) throws Exception {
+        assertTrue(keys.length > 0);
+
+        info("Test transaction [concurrency=" + concurrency + ", isolation=" + 
isolation + ']');
+
+        GridCache<Integer, Integer> cache = grid(0).cache(null);
+
+        if (putBefore) {
+            idxSpi.forceFail(false);
+
+            info("Start transaction.");
+
+            try (IgniteTx tx = cache.txStart(concurrency, isolation)) {
+                for (Integer key : keys) {
+                    info("Put " + key);
+
+                    cache.put(key, 1);
+                }
+
+                info("Commit.");
+
+                tx.commit();
+            }
+        }
+
+        // Execute get from all nodes to create readers for near cache.
+        for (int i = 0; i < gridCount(); i++) {
+            for (Integer key : keys)
+                grid(i).cache(null).get(key);
+        }
+
+        idxSpi.forceFail(true);
+
+        try {
+            info("Start transaction.");
+
+            try (IgniteTx tx = cache.txStart(concurrency, isolation)) {
+                for (Integer key : keys) {
+                    info("Put " + key);
+
+                    cache.put(key, 2);
+                }
+
+                info("Commit.");
+
+                tx.commit();
+            }
+
+            fail("Transaction should fail.");
+        }
+        catch (IgniteTxHeuristicException e) {
+            log.info("Expected exception: " + e);
+        }
+
+        for (Integer key : keys)
+            checkEmpty(key);
+    }
+
+    /**
+     * @param key Key.
+     * @throws Exception If failed.
+     */
+    private void checkEmpty(final Integer key) throws Exception {
+        idxSpi.forceFail(false);
+
+        info("Check key: " + key);
+
+        for (int i = 0; i < gridCount(); i++) {
+            GridKernal grid = (GridKernal) grid(i);
+
+            GridCacheAdapter cache = grid.internalCache(null);
+
+            GridCacheMapEntry entry = cache.map().getEntry(key);
+
+            log.info("Entry: " + entry);
+
+            if (entry != null) {
+                assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + 
entry + ']', entry.lockedByAny());
+                assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + 
entry + ']', entry.hasValue());
+            }
+
+            if (cache.isNear()) {
+                entry = 
((GridNearCacheAdapter)cache).dht().map().getEntry(key);
+
+                log.info("Dht entry: " + entry);
+
+                if (entry != null) {
+                    assertFalse("Unexpected entry for grid [i=" + i + ", 
entry=" + entry + ']', entry.lockedByAny());
+                    assertFalse("Unexpected entry for grid [i=" + i + ", 
entry=" + entry + ']', entry.hasValue());
+                }
+            }
+        }
+
+        for (int i = 0; i < gridCount(); i++)
+            assertEquals("Unexpected value for grid " + i, null, 
grid(i).cache(null).get(key));
+    }
+
+    /**
+     * @param putBefore If {@code true} then puts some value before executing 
failing operation.
+     * @param key Key.
+     * @throws Exception If failed.
+     */
+    private void checkPut(boolean putBefore, final Integer key) throws 
Exception {
+        if (putBefore) {
+            idxSpi.forceFail(false);
+
+            info("Put key: " + key);
+
+            grid(0).cache(null).put(key, 1);
+        }
+
+        // Execute get from all nodes to create readers for near cache.
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).cache(null).get(key);
+
+        idxSpi.forceFail(true);
+
+        info("Going to put: " + key);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                grid(0).cache(null).put(key, 2);
+
+                return null;
+            }
+        }, IgniteTxHeuristicException.class, null);
+
+        checkEmpty(key);
+    }
+
+    /**
+     * @param putBefore If {@code true} then puts some value before executing 
failing operation.
+     * @param key Key.
+     * @throws Exception If failed.
+     */
+    private void checkTransform(boolean putBefore, final Integer key) throws 
Exception {
+        if (putBefore) {
+            idxSpi.forceFail(false);
+
+            info("Put key: " + key);
+
+            grid(0).cache(null).put(key, 1);
+        }
+
+        // Execute get from all nodes to create readers for near cache.
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).cache(null).get(key);
+
+        idxSpi.forceFail(true);
+
+        info("Going to transform: " + key);
+
+        Throwable e = GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                grid(0).<Integer, Integer>jcache(null).invoke(key, new 
EntryProcessor<Integer, Integer, Void>() {
+                    @Override public Void process(MutableEntry<Integer, 
Integer> e, Object... args) {
+                        e.setValue(2);
+
+                        return null;
+                    }
+                });
+
+                return null;
+            }
+        }, CacheException.class, null);
+
+        assertTrue("Unexpected cause: "  +e, e.getCause() instanceof 
IgniteTxHeuristicException);
+
+        checkEmpty(key);
+    }
+
+    /**
+     * @param putBefore If {@code true} then puts some value before executing 
failing operation.
+     * @param keys Keys.
+     * @throws Exception If failed.
+     */
+    private void checkPutAll(boolean putBefore, Integer ... keys) throws 
Exception {
+        assert keys.length > 1;
+
+        if (putBefore) {
+            idxSpi.forceFail(false);
+
+            Map<Integer, Integer> m = new HashMap<>();
+
+            for (Integer key : keys)
+                m.put(key, 1);
+
+            info("Put data: " + m);
+
+            grid(0).cache(null).putAll(m);
+        }
+
+        // Execute get from all nodes to create readers for near cache.
+        for (int i = 0; i < gridCount(); i++) {
+            for (Integer key : keys)
+                grid(i).cache(null).get(key);
+        }
+
+        idxSpi.forceFail(true);
+
+        final Map<Integer, Integer> m = new HashMap<>();
+
+        for (Integer key : keys)
+            m.put(key, 2);
+
+        info("Going to putAll: " + m);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                grid(0).cache(null).putAll(m);
+
+                return null;
+            }
+        }, IgniteTxHeuristicException.class, null);
+
+        for (Integer key : m.keySet())
+            checkEmpty(key);
+    }
+
+    /**
+     * @param putBefore If {@code true} then puts some value before executing 
failing operation.
+     * @param key Key.
+     * @throws Exception If failed.
+     */
+    private void checkRemove(boolean putBefore, final Integer key) throws 
Exception {
+        if (putBefore) {
+            idxSpi.forceFail(false);
+
+            info("Put key: " + key);
+
+            grid(0).cache(null).put(key, 1);
+        }
+
+        // Execute get from all nodes to create readers for near cache.
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).cache(null).get(key);
+
+        idxSpi.forceFail(true);
+
+        info("Going to remove: " + key);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                grid(0).cache(null).remove(key);
+
+                return null;
+            }
+        }, IgniteTxHeuristicException.class, null);
+
+        checkEmpty(key);
+    }
+
+    /**
+     * Generates key of a given type for given node.
+     *
+     * @param node Node.
+     * @param type Key type.
+     * @return Key.
+     */
+    private Integer keyForNode(ClusterNode node, int type) {
+        GridCache<Integer, Integer> cache = grid(0).cache(null);
+
+        if (cache.configuration().getCacheMode() == LOCAL)
+            return ++lastKey;
+
+        if (cache.configuration().getCacheMode() == REPLICATED && type == 
NOT_PRIMARY_AND_BACKUP)
+            return ++lastKey;
+
+        for (int key = lastKey + 1; key < (lastKey + 10_000); key++) {
+            switch (type) {
+                case NOT_PRIMARY_AND_BACKUP: {
+                    if (!cache.affinity().isPrimaryOrBackup(node, key)) {
+                        lastKey = key;
+
+                        return key;
+                    }
+
+                    break;
+                }
+
+                case PRIMARY: {
+                    if (cache.affinity().isPrimary(node, key)) {
+                        lastKey = key;
+
+                        return key;
+                    }
+
+                    break;
+                }
+
+                case BACKUP: {
+                    if (cache.affinity().isBackup(node, key)) {
+                        lastKey = key;
+
+                        return key;
+                    }
+
+                    break;
+                }
+
+                default:
+                    fail();
+            }
+        }
+
+        throw new IllegalStateException("Failed to find key.");
+    }
+
+    /**
+     * Indexing SPI that can fail on demand.
+     */
+    private static class TestIndexingSpi extends IgniteSpiAdapter implements 
GridIndexingSpi {
+        /** Fail flag. */
+        private volatile boolean fail;
+
+        /**
+         * @param fail Fail flag.
+         */
+        public void forceFail(boolean fail) {
+            this.fail = fail;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<?> query(@Nullable String spaceName, 
Collection<Object> params, @Nullable GridIndexingQueryFilter filters) throws 
IgniteSpiException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void store(@Nullable String spaceName, Object key, 
Object val, long expirationTime)
+            throws IgniteSpiException {
+            if (fail) {
+                fail = false;
+
+                throw new IgniteSpiException("Test exception.");
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void remove(@Nullable String spaceName, Object k)
+            throws IgniteSpiException {
+            if (fail) {
+                fail = false;
+
+                throw new IgniteSpiException("Test exception.");
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSwap(@Nullable String spaceName, Object key) 
throws IgniteSpiException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUnswap(@Nullable String spaceName, Object key, 
Object val) throws IgniteSpiException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStart(@Nullable String gridName) throws 
IgniteSpiException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStop() throws IgniteSpiException {
+            // No-op.
+        }
+    }
+}

Reply via email to