Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-gg-9973 [created] 038f610e8


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java
deleted file mode 100644
index 305b8bb..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.apache.ignite.transactions.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.configuration.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.transactions.TransactionConcurrency.*;
-import static org.apache.ignite.transactions.TransactionIsolation.*;
-
-/**
- * Basic store test.
- */
-public abstract class GridCacheWriteBehindStoreAbstractTest extends 
GridCommonAbstractTest {
-    /** Flush frequency. */
-    private static final int WRITE_FROM_BEHIND_FLUSH_FREQUENCY = 1000;
-
-    /** Cache store. */
-    private static final GridCacheTestStore store = new GridCacheTestStore();
-
-    /**
-     *
-     */
-    protected GridCacheWriteBehindStoreAbstractTest() {
-        super(true /*start grid. */);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        store.resetTimestamp();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        IgniteCache<?, ?> cache = jcache();
-
-        if (cache != null)
-            cache.clear();
-
-        store.reset();
-    }
-
-    /** @return Caching mode. */
-    protected abstract CacheMode cacheMode();
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected final IgniteConfiguration getConfiguration() throws 
Exception {
-        IgniteConfiguration c = super.getConfiguration();
-
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setIpFinder(new TcpDiscoveryVmIpFinder(true));
-
-        c.setDiscoverySpi(disco);
-
-        CacheConfiguration cc = defaultCacheConfiguration();
-
-        cc.setCacheMode(cacheMode());
-        
cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        cc.setSwapEnabled(false);
-        cc.setAtomicityMode(TRANSACTIONAL);
-
-        cc.setCacheStoreFactory(singletonFactory(store));
-        cc.setReadThrough(true);
-        cc.setWriteThrough(true);
-        cc.setLoadPreviousValue(true);
-
-        cc.setWriteBehindEnabled(true);
-        cc.setWriteBehindFlushFrequency(WRITE_FROM_BEHIND_FLUSH_FREQUENCY);
-
-        c.setCacheConfiguration(cc);
-
-        return c;
-    }
-
-    /** @throws Exception If test fails. */
-    public void testWriteThrough() throws Exception {
-        IgniteCache<Integer, String> cache = jcache();
-
-        Map<Integer, String> map = store.getMap();
-
-        assert map.isEmpty();
-
-        Transaction tx = grid().transactions().txStart(OPTIMISTIC, 
REPEATABLE_READ);
-
-        try {
-            for (int i = 1; i <= 10; i++) {
-                cache.put(i, Integer.toString(i));
-
-                checkLastMethod(null);
-            }
-
-            tx.commit();
-        }
-        finally {
-            tx.close();
-        }
-
-        // Need to wait WFB flush timeout.
-        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
-
-        checkLastMethod("putAll");
-
-        assert cache.size() == 10;
-
-        for (int i = 1; i <= 10; i++) {
-            String val = map.get(i);
-
-            assert val != null;
-            assert val.equals(Integer.toString(i));
-        }
-
-        store.resetLastMethod();
-
-        tx = grid().transactions().txStart();
-
-        try {
-            for (int i = 1; i <= 10; i++) {
-                String val = cache.getAndRemove(i);
-
-                checkLastMethod(null);
-
-                assert val != null;
-                assert val.equals(Integer.toString(i));
-            }
-
-            tx.commit();
-        }
-        finally {
-            tx.close();
-        }
-
-        // Need to wait WFB flush timeout.
-        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
-
-        checkLastMethod("removeAll");
-
-        assert map.isEmpty();
-    }
-
-    /** @throws Exception If test failed. */
-    public void testReadThrough() throws Exception {
-        IgniteCache<Integer, String> cache = jcache();
-
-        Map<Integer, String> map = store.getMap();
-
-        assert map.isEmpty();
-
-        try (Transaction tx = grid().transactions().txStart(OPTIMISTIC, 
REPEATABLE_READ)) {
-            for (int i = 1; i <= 10; i++)
-                cache.put(i, Integer.toString(i));
-
-            checkLastMethod(null);
-
-            tx.commit();
-        }
-
-        // Need to wait WFB flush timeout.
-        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
-
-        checkLastMethod("putAll");
-
-        for (int i = 1; i <= 10; i++) {
-            String val = map.get(i);
-
-            assert val != null;
-            assert val.equals(Integer.toString(i));
-        }
-
-        cache.clear();
-
-        assert cache.localSize() == 0;
-        assert cache.localSize() == 0;
-
-        // Need to wait WFB flush timeout.
-        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
-
-        assert map.size() == 10;
-
-        for (int i = 1; i <= 10; i++) {
-            // Read through.
-            String val = cache.get(i);
-
-            checkLastMethod("load");
-
-            assert val != null;
-            assert val.equals(Integer.toString(i));
-        }
-
-        assert cache.size() == 10;
-
-        cache.clear();
-
-        assert cache.localSize() == 0;
-        assert cache.localSize() == 0;
-
-        assert map.size() == 10;
-
-        Set<Integer> keys = new HashSet<>();
-
-        for (int i = 1; i <= 10; i++)
-            keys.add(i);
-
-        // Read through.
-        Map<Integer, String> vals = cache.getAll(keys);
-
-        checkLastMethod("loadAll");
-
-        assert vals != null;
-        assert vals.size() == 10;
-
-        for (int i = 1; i <= 10; i++) {
-            String val = vals.get(i);
-
-            assert val != null;
-            assert val.equals(Integer.toString(i));
-        }
-
-        // Write through.
-        cache.removeAll(keys);
-
-        // Need to wait WFB flush timeout.
-        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
-
-        checkLastMethod("removeAll");
-
-        assert cache.localSize() == 0;
-        assert cache.localSize() == 0;
-
-        assert map.isEmpty();
-    }
-
-    /** @throws Exception If failed. */
-    public void testMultithreaded() throws Exception {
-        final ConcurrentMap<String, Set<Integer>> perThread = new 
ConcurrentHashMap<>();
-
-        final AtomicBoolean running = new AtomicBoolean(true);
-
-        final IgniteCache<Integer, String> cache = jcache();
-
-        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-            @SuppressWarnings({"NullableProblems"})
-            @Override public void run() {
-                // Initialize key set for this thread.
-                Set<Integer> set = new HashSet<>();
-
-                Set<Integer> old = 
perThread.putIfAbsent(Thread.currentThread().getName(), set);
-
-                if (old != null)
-                    set = old;
-
-                Random rnd = new Random();
-
-                int keyCnt = 20000;
-
-                while (running.get()) {
-                    int op = rnd.nextInt(2);
-                    int key = rnd.nextInt(keyCnt);
-
-                    switch (op) {
-                        case 0:
-                            cache.put(key, "val" + key);
-                            set.add(key);
-
-                            break;
-
-                        case 1:
-                        default:
-                            cache.remove(key);
-                            set.remove(key);
-
-                            break;
-                    }
-                }
-            }
-        }, 10, "put");
-
-        U.sleep(10000);
-
-        running.set(false);
-
-        fut.get();
-
-        U.sleep(5 * WRITE_FROM_BEHIND_FLUSH_FREQUENCY);
-
-        Map<Integer, String> stored = store.getMap();
-
-        for (Map.Entry<Integer, String> entry : stored.entrySet()) {
-            int key = entry.getKey();
-
-            assertEquals("Invalid value for key " + key, "val" + key, 
entry.getValue());
-
-            boolean found = false;
-
-            for (Set<Integer> threadPuts : perThread.values()) {
-                if (threadPuts.contains(key)) {
-                    found = true;
-
-                    break;
-                }
-            }
-
-            assert found : "No threads found that put key " + key;
-        }
-    }
-
-    /** @param mtd Expected last method value. */
-    private void checkLastMethod(@Nullable String mtd) {
-        String lastMtd = store.getLastMethod();
-
-        if (mtd == null)
-            assert lastMtd == null : "Last method must be null: " + lastMtd;
-        else {
-            assert lastMtd != null : "Last method must be not null";
-            assert lastMtd.equals(mtd) : "Last method does not match 
[expected=" + mtd + ", lastMtd=" + lastMtd + ']';
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java
deleted file mode 100644
index 6c050ca..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.cache.*;
-
-/**
- * Tests {@link GridCacheWriteBehindStore} in grid configuration.
- */
-public class GridCacheWriteBehindStoreLocalTest extends 
GridCacheWriteBehindStoreAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return CacheMode.LOCAL;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java
deleted file mode 100644
index 9607784..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-
-/**
- * Multithreaded tests for {@link GridCacheWriteBehindStore}.
- */
-public class GridCacheWriteBehindStoreMultithreadedSelfTest extends 
GridCacheWriteBehindStoreAbstractSelfTest {
-    /**
-     * This test performs complex set of operations on store from multiple 
threads.
-     *
-     * @throws Exception If failed.
-     */
-    public void testPutGetRemove() throws Exception {
-        initStore(2);
-
-        Set<Integer> exp;
-
-        try {
-            exp = runPutGetRemoveMultithreaded(10, 10);
-        }
-        finally {
-            shutdownStore();
-        }
-
-        Map<Integer, String> map = delegate.getMap();
-
-        Collection<Integer> extra = new HashSet<>(map.keySet());
-
-        extra.removeAll(exp);
-
-        assertTrue("The underlying store contains extra keys: " + extra, 
extra.isEmpty());
-
-        Collection<Integer> missing = new HashSet<>(exp);
-
-        missing.removeAll(map.keySet());
-
-        assertTrue("Missing keys in the underlying store: " + missing, 
missing.isEmpty());
-
-        for (Integer key : exp)
-            assertEquals("Invalid value for key " + key, "val" + key, 
map.get(key));
-    }
-
-    /**
-     * Tests that cache would keep values if underlying store fails.
-     *
-     * @throws Exception If failed.
-     */
-    public void testStoreFailure() throws Exception {
-        delegate.setShouldFail(true);
-
-        initStore(2);
-
-        Set<Integer> exp;
-
-        try {
-            exp = runPutGetRemoveMultithreaded(10, 10);
-
-            U.sleep(FLUSH_FREQUENCY);
-
-            info(">>> There are " + store.getWriteBehindErrorRetryCount() + " 
entries in RETRY state");
-
-            delegate.setShouldFail(false);
-
-            // Despite that we set shouldFail flag to false, flush thread may 
just have caught an exception.
-            // If we move store to the stopping state right away, this value 
will be lost. That's why this sleep
-            // is inserted here to let all exception handlers in write-behind 
store exit.
-            U.sleep(1000);
-        }
-        finally {
-            shutdownStore();
-        }
-
-        Map<Integer, String> map = delegate.getMap();
-
-        Collection<Integer> extra = new HashSet<>(map.keySet());
-
-        extra.removeAll(exp);
-
-        assertTrue("The underlying store contains extra keys: " + extra, 
extra.isEmpty());
-
-        Collection<Integer> missing = new HashSet<>(exp);
-
-        missing.removeAll(map.keySet());
-
-        assertTrue("Missing keys in the underlying store: " + missing, 
missing.isEmpty());
-
-        for (Integer key : exp)
-            assertEquals("Invalid value for key " + key, "val" + key, 
map.get(key));
-    }
-
-    /**
-     * Tests store consistency in case of high put rate, when flush is 
performed from the same thread
-     * as put or remove operation.
-     *
-     * @throws Exception If failed.
-     */
-    public void testFlushFromTheSameThread() throws Exception {
-        // 50 milliseconds should be enough.
-        delegate.setOperationDelay(50);
-
-        initStore(2);
-
-        Set<Integer> exp;
-
-        int start = store.getWriteBehindTotalCriticalOverflowCount();
-
-        try {
-            //We will have in total 5 * CACHE_SIZE keys that should be enough 
to grow map size to critical value.
-            exp = runPutGetRemoveMultithreaded(5, CACHE_SIZE);
-        }
-        finally {
-            log.info(">>> Done inserting, shutting down the store");
-
-            shutdownStore();
-        }
-
-        // Restore delay.
-        delegate.setOperationDelay(0);
-
-        Map<Integer, String> map = delegate.getMap();
-
-        int end = store.getWriteBehindTotalCriticalOverflowCount();
-
-        log.info(">>> There are " + exp.size() + " keys in store, " + (end - 
start) + " overflows detected");
-
-        assertTrue("No cache overflows detected (a bug or too few keys or too 
few delay?)", end > start);
-
-        Collection<Integer> extra = new HashSet<>(map.keySet());
-
-        extra.removeAll(exp);
-
-        assertTrue("The underlying store contains extra keys: " + extra, 
extra.isEmpty());
-
-        Collection<Integer> missing = new HashSet<>(exp);
-
-        missing.removeAll(map.keySet());
-
-        assertTrue("Missing keys in the underlying store: " + missing, 
missing.isEmpty());
-
-        for (Integer key : exp)
-            assertEquals("Invalid value for key " + key, "val" + key, 
map.get(key));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
deleted file mode 100644
index 8fb4f68..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.apache.ignite.transactions.*;
-
-import java.util.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.transactions.TransactionConcurrency.*;
-import static org.apache.ignite.transactions.TransactionIsolation.*;
-
-/**
- * Tests write-behind store with near and dht commit option.
- */
-public class GridCacheWriteBehindStorePartitionedMultiNodeSelfTest extends 
GridCommonAbstractTest {
-    /** Grids to start. */
-    private static final int GRID_CNT = 5;
-
-    /** Ip finder. */
-    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
-
-    /** Flush frequency. */
-    public static final int WRITE_BEHIND_FLUSH_FREQ = 1000;
-
-    /** Stores per grid. */
-    private GridCacheTestStore[] stores = new GridCacheTestStore[GRID_CNT];
-
-    /** Start grid counter. */
-    private int idx;
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration c = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setIpFinder(ipFinder);
-
-        c.setDiscoverySpi(disco);
-
-        CacheConfiguration cc = defaultCacheConfiguration();
-
-        cc.setCacheMode(CacheMode.PARTITIONED);
-        cc.setWriteBehindEnabled(true);
-        cc.setWriteBehindFlushFrequency(WRITE_BEHIND_FLUSH_FREQ);
-        cc.setAtomicityMode(TRANSACTIONAL);
-        cc.setNearConfiguration(new NearCacheConfiguration());
-
-        CacheStore store = stores[idx] = new GridCacheTestStore();
-
-        cc.setCacheStoreFactory(singletonFactory(store));
-        cc.setReadThrough(true);
-        cc.setWriteThrough(true);
-        cc.setLoadPreviousValue(true);
-
-        c.setCacheConfiguration(cc);
-
-        idx++;
-
-        return c;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stores = null;
-
-        super.afterTestsStopped();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void prepare() throws Exception {
-        idx = 0;
-
-        startGrids(GRID_CNT);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        super.afterTest();
-
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSingleWritesOnDhtNode() throws Exception {
-        checkSingleWrites();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBatchWritesOnDhtNode() throws Exception {
-        checkBatchWrites();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxWritesOnDhtNode() throws Exception {
-        checkTxWrites();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkSingleWrites() throws Exception {
-        prepare();
-
-        IgniteCache<Integer, String> cache = grid(0).cache(null);
-
-        for (int i = 0; i < 100; i++)
-            cache.put(i, String.valueOf(i));
-
-        checkWrites();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkBatchWrites() throws Exception {
-        prepare();
-
-        Map<Integer, String> map = new HashMap<>();
-
-        for (int i = 0; i < 100; i++)
-            map.put(i, String.valueOf(i));
-
-        grid(0).cache(null).putAll(map);
-
-        checkWrites();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkTxWrites() throws Exception {
-        prepare();
-
-        IgniteCache<Object, Object> cache = grid(0).cache(null);
-
-        try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
-            for (int i = 0; i < 100; i++)
-                cache.put(i, String.valueOf(i));
-
-            tx.commit();
-        }
-
-        checkWrites();
-    }
-
-    /**
-     * @throws IgniteInterruptedCheckedException If sleep was interrupted.
-     */
-    private void checkWrites() throws IgniteInterruptedCheckedException {
-        U.sleep(WRITE_BEHIND_FLUSH_FREQ * 2);
-
-        Collection<Integer> allKeys = new ArrayList<>(100);
-
-        for (int i = 0; i < GRID_CNT; i++) {
-            Map<Integer,String> map = stores[i].getMap();
-
-            assertFalse("Missing writes for node: " + i, map.isEmpty());
-
-            allKeys.addAll(map.keySet());
-
-            // Check there is no intersection.
-            for (int j = 0; j < GRID_CNT; j++) {
-                if (i == j)
-                    continue;
-
-                Collection<Integer> intersection = new 
HashSet<>(stores[j].getMap().keySet());
-
-                intersection.retainAll(map.keySet());
-
-                assertTrue(intersection.isEmpty());
-            }
-        }
-
-        assertEquals(100, allKeys.size());
-
-        for (int i = 0; i < 100; i++)
-            assertTrue(allKeys.contains(i));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java
deleted file mode 100644
index f9e454f..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.cache.*;
-
-/**
- * Tests {@link GridCacheWriteBehindStore} in partitioned configuration.
- */
-public class GridCacheWriteBehindStorePartitionedTest extends 
GridCacheWriteBehindStoreAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return CacheMode.PARTITIONED;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java
deleted file mode 100644
index c809f90..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.cache.*;
-
-/**
- * Tests {@link GridCacheWriteBehindStore} in grid configuration.
- */
-public class GridCacheWriteBehindStoreReplicatedTest extends 
GridCacheWriteBehindStoreAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return CacheMode.REPLICATED;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java
deleted file mode 100644
index a3b1072..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jsr166.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * This class provides basic tests for {@link GridCacheWriteBehindStore}.
- */
-public class GridCacheWriteBehindStoreSelfTest extends 
GridCacheWriteBehindStoreAbstractSelfTest {
-    /**
-     * Tests correct store shutdown when underlying store fails,
-     *
-     * @throws Exception If failed.
-     */
-    public void testShutdownWithFailure() throws Exception {
-        final AtomicReference<Exception> err = new AtomicReference<>();
-
-        multithreadedAsync(new Runnable() {
-            @Override public void run() {
-                try {
-                    delegate.setShouldFail(true);
-
-                    initStore(2);
-
-                    try {
-                        store.write(new CacheEntryImpl<>(1, "val1"));
-                        store.write(new CacheEntryImpl<>(2, "val2"));
-                    }
-                    finally {
-                        shutdownStore();
-
-                        delegate.setShouldFail(false);
-                    }
-                }
-                catch (Exception e) {
-                    err.set(e);
-                }
-            }
-        }, 1).get();
-
-        if (err.get() != null)
-            throw err.get();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSimpleStore() throws Exception {
-        initStore(2);
-
-        try {
-            store.write(new CacheEntryImpl<>(1, "v1"));
-            store.write(new CacheEntryImpl<>(2, "v2"));
-
-            assertEquals("v1", store.load(1));
-            assertEquals("v2", store.load(2));
-            assertNull(store.load(3));
-
-            store.delete(1);
-
-            assertNull(store.load(1));
-            assertEquals("v2", store.load(2));
-            assertNull(store.load(3));
-        }
-        finally {
-            shutdownStore();
-        }
-    }
-
-    /**
-     * Check that all values written to the store will be in underlying store 
after timeout or due to size limits.
-     *
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings({"NullableProblems"})
-    public void testValuePropagation() throws Exception {
-        // Need to test size-based write.
-        initStore(1);
-
-        try {
-            for (int i = 0; i < CACHE_SIZE * 2; i++)
-                store.write(new CacheEntryImpl<>(i, "val" + i));
-
-            U.sleep(200);
-
-            for (int i = 0; i < CACHE_SIZE; i++) {
-                String val = delegate.load(i);
-
-                assertNotNull("Value for [key= " + i + "] was not written in 
store", val);
-                assertEquals("Invalid value [key=" + i + "]", "val" + i, val);
-            }
-
-            U.sleep(FLUSH_FREQUENCY + 300);
-
-            for (int i = CACHE_SIZE; i < CACHE_SIZE * 2; i++) {
-                String val = delegate.load(i);
-
-                assertNotNull("Value for [key= " + i + "] was not written in 
store", val);
-                assertEquals("Invalid value [key=" + i + "]", "val" + i, val);
-            }
-        }
-        finally {
-            shutdownStore();
-        }
-    }
-
-    /**
-     * Tests store behaviour under continuous put of the same key with 
different values.
-     *
-     * @throws Exception If failed
-     */
-    public void testContinuousPut() throws Exception {
-        initStore(2);
-
-        try {
-            final AtomicBoolean running = new AtomicBoolean(true);
-
-            final AtomicInteger actualPutCnt = new AtomicInteger();
-
-            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-                @SuppressWarnings({"NullableProblems"})
-                @Override public void run() {
-                    try {
-                        while (running.get()) {
-                            for (int i = 0; i < CACHE_SIZE; i++) {
-                                store.write(new CacheEntryImpl<>(i, "val-0"));
-
-                                actualPutCnt.incrementAndGet();
-
-                                store.write(new CacheEntryImpl<>(i, "val" + 
i));
-
-                                actualPutCnt.incrementAndGet();
-                            }
-                        }
-                    }
-                    catch (Exception e) {
-                        error("Unexpected exception in put thread", e);
-
-                        assert false;
-                    }
-                }
-            }, 1, "put");
-
-            U.sleep(FLUSH_FREQUENCY * 2 + 500);
-
-            int delegatePutCnt = delegate.getPutAllCount();
-
-            running.set(false);
-
-            fut.get();
-
-            log().info(">>> [putCnt = " + actualPutCnt.get() + ", 
delegatePutCnt=" + delegatePutCnt + "]");
-
-            assertTrue("No puts were made to the underlying store", 
delegatePutCnt > 0);
-            assertTrue("Too many puts were made to the underlying store", 
delegatePutCnt < actualPutCnt.get() / 10);
-        }
-        finally {
-            shutdownStore();
-        }
-
-        // These checks must be done after the store shut down
-        assertEquals("Invalid store size", CACHE_SIZE, 
delegate.getMap().size());
-
-        for (int i = 0; i < CACHE_SIZE; i++)
-            assertEquals("Invalid value stored", "val" + i, 
delegate.getMap().get(i));
-    }
-
-    /**
-     * Tests that all values were put into the store will be written to the 
underlying store
-     * after shutdown is called.
-     *
-     * @throws Exception If failed.
-     */
-    public void testShutdown() throws Exception {
-        initStore(2);
-
-        try {
-            final AtomicBoolean running = new AtomicBoolean(true);
-
-            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-                @SuppressWarnings({"NullableProblems"})
-                @Override public void run() {
-                    try {
-                        while (running.get()) {
-                            for (int i = 0; i < CACHE_SIZE; i++) {
-                                store.write(new CacheEntryImpl<>(i, "val-0"));
-
-                                store.write(new CacheEntryImpl<>(i, "val" + 
i));
-                            }
-                        }
-                    }
-                    catch (Exception e) {
-                        error("Unexpected exception in put thread", e);
-
-                        assert false;
-                    }
-                }
-            }, 1, "put");
-
-            U.sleep(300);
-
-            running.set(false);
-
-            fut.get();
-        }
-        finally {
-            shutdownStore();
-        }
-
-        // These checks must be done after the store shut down
-        assertEquals("Invalid store size", CACHE_SIZE, 
delegate.getMap().size());
-
-        for (int i = 0; i < CACHE_SIZE; i++)
-            assertEquals("Invalid value stored", "val" + i, 
delegate.getMap().get(i));
-    }
-
-    /**
-     * Tests that all values will be written to the underlying store
-     * right in the same order as they were put into the store.
-     *
-     * @throws Exception If failed.
-     */
-    public void testBatchApply() throws Exception {
-        delegate = new GridCacheTestStore(new ConcurrentLinkedHashMap<Integer, 
String>());
-
-        initStore(1);
-
-        List<Integer> intList = new ArrayList<>(CACHE_SIZE);
-
-        try {
-            for (int i = 0; i < CACHE_SIZE; i++) {
-                store.write(new CacheEntryImpl<>(i, "val" + i));
-
-                intList.add(i);
-            }
-        }
-        finally {
-            shutdownStore();
-        }
-
-        Map<Integer, String> underlyingMap = delegate.getMap();
-
-        assertTrue("Store map key set: " + underlyingMap.keySet(), 
F.eqOrdered(underlyingMap.keySet(), intList));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
new file mode 100644
index 0000000..c1dd081
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.store.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Harness for {@link GridCacheWriteBehindStore} tests.
+ */
+public abstract class GridCacheWriteBehindStoreAbstractSelfTest extends 
GridCommonAbstractTest {
+    /** Write cache size. */
+    public static final int CACHE_SIZE = 1024;
+
+    /** Value dump interval. */
+    public static final int FLUSH_FREQUENCY = 1000;
+
+    /** Underlying store. */
+    protected GridCacheTestStore delegate = new GridCacheTestStore();
+
+    /** Tested store. */
+    protected GridCacheWriteBehindStore<Integer, String> store;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        delegate = null;
+        store = null;
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * Initializes store.
+     *
+     * @param flushThreadCnt Count of flush threads
+     * @throws Exception If failed.
+     */
+    protected void initStore(int flushThreadCnt) throws Exception {
+        store = new GridCacheWriteBehindStore<>(null, "", "", log, delegate);
+
+        store.setFlushFrequency(FLUSH_FREQUENCY);
+
+        store.setFlushSize(CACHE_SIZE);
+
+        store.setFlushThreadCount(flushThreadCnt);
+
+        delegate.reset();
+
+        store.start();
+    }
+
+    /**
+     * Shutdowns store.
+     *
+     * @throws Exception If failed.
+     */
+    protected void shutdownStore() throws Exception {
+        store.stop();
+
+        assertTrue("Store cache must be empty after shutdown", 
store.writeCache().isEmpty());
+    }
+
+    /**
+     * Performs multiple put, get and remove operations in several threads on 
a store. After
+     * all threads finished their operations, returns the total set of keys 
that should be
+     * in underlying store.
+     *
+     * @param threadCnt Count of threads that should update keys.
+     * @param keysPerThread Count of unique keys assigned to a thread.
+     * @return Set of keys that was totally put in store.
+     * @throws Exception If failed.
+     */
+    protected Set<Integer> runPutGetRemoveMultithreaded(int threadCnt, final 
int keysPerThread) throws Exception {
+        final ConcurrentMap<String, Set<Integer>> perThread = new 
ConcurrentHashMap<>();
+
+        final AtomicBoolean running = new AtomicBoolean(true);
+
+        final AtomicInteger cntr = new AtomicInteger();
+
+        final AtomicInteger operations = new AtomicInteger();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+            @SuppressWarnings({"NullableProblems"})
+            @Override public void run() {
+                // Initialize key set for this thread.
+                Set<Integer> set = new HashSet<>();
+
+                Set<Integer> old = 
perThread.putIfAbsent(Thread.currentThread().getName(), set);
+
+                if (old != null)
+                    set = old;
+
+                List<Integer> original = new ArrayList<>();
+
+                Random rnd = new Random();
+
+                for (int i = 0; i < keysPerThread; i++)
+                    original.add(cntr.getAndIncrement());
+
+                try {
+                    while (running.get()) {
+                        int op = rnd.nextInt(3);
+                        int idx = rnd.nextInt(keysPerThread);
+
+                        int key = original.get(idx);
+
+                        switch (op) {
+                            case 0:
+                                store.write(new CacheEntryImpl<>(key, "val" + 
key));
+                                set.add(key);
+
+                                operations.incrementAndGet();
+
+                                break;
+
+                            case 1:
+                                store.delete(key);
+                                set.remove(key);
+
+                                operations.incrementAndGet();
+
+                                break;
+
+                            case 2:
+                            default:
+                                store.write(new CacheEntryImpl<>(key, 
"broken"));
+
+                                String val = store.load(key);
+
+                                assertEquals("Invalid intermediate value: " + 
val, "broken", val);
+
+                                store.write(new CacheEntryImpl<>(key, "val" + 
key));
+
+                                set.add(key);
+
+                                // 2 put operations performed here.
+                                operations.incrementAndGet();
+                                operations.incrementAndGet();
+                                operations.incrementAndGet();
+
+                                break;
+                        }
+                    }
+                }
+                catch (Exception e) {
+                    error("Unexpected exception in put thread", e);
+
+                    assert false;
+                }
+            }
+        }, threadCnt, "put");
+
+        U.sleep(10000);
+
+        running.set(false);
+
+        fut.get();
+
+        log().info(">>> " + operations + " operations performed totally");
+
+        Set<Integer> total = new HashSet<>();
+
+        for (Set<Integer> threadVals : perThread.values()) {
+            total.addAll(threadVals);
+        }
+
+        return total;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
new file mode 100644
index 0000000..d4d6f02
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.configuration.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ * Basic store test.
+ */
+public abstract class GridCacheWriteBehindStoreAbstractTest extends 
GridCommonAbstractTest {
+    /** Flush frequency. */
+    private static final int WRITE_FROM_BEHIND_FLUSH_FREQUENCY = 1000;
+
+    /** Cache store. */
+    private static final GridCacheTestStore store = new GridCacheTestStore();
+
+    /**
+     *
+     */
+    protected GridCacheWriteBehindStoreAbstractTest() {
+        super(true /*start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        store.resetTimestamp();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        IgniteCache<?, ?> cache = jcache();
+
+        if (cache != null)
+            cache.clear();
+
+        store.reset();
+    }
+
+    /** @return Caching mode. */
+    protected abstract CacheMode cacheMode();
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected final IgniteConfiguration getConfiguration() throws 
Exception {
+        IgniteConfiguration c = super.getConfiguration();
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        c.setDiscoverySpi(disco);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(cacheMode());
+        
cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cc.setSwapEnabled(false);
+        cc.setAtomicityMode(TRANSACTIONAL);
+
+        cc.setCacheStoreFactory(singletonFactory(store));
+        cc.setReadThrough(true);
+        cc.setWriteThrough(true);
+        cc.setLoadPreviousValue(true);
+
+        cc.setWriteBehindEnabled(true);
+        cc.setWriteBehindFlushFrequency(WRITE_FROM_BEHIND_FLUSH_FREQUENCY);
+
+        c.setCacheConfiguration(cc);
+
+        return c;
+    }
+
+    /** @throws Exception If test fails. */
+    public void testWriteThrough() throws Exception {
+        IgniteCache<Integer, String> cache = jcache();
+
+        Map<Integer, String> map = store.getMap();
+
+        assert map.isEmpty();
+
+        Transaction tx = grid().transactions().txStart(OPTIMISTIC, 
REPEATABLE_READ);
+
+        try {
+            for (int i = 1; i <= 10; i++) {
+                cache.put(i, Integer.toString(i));
+
+                checkLastMethod(null);
+            }
+
+            tx.commit();
+        }
+        finally {
+            tx.close();
+        }
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        checkLastMethod("putAll");
+
+        assert cache.size() == 10;
+
+        for (int i = 1; i <= 10; i++) {
+            String val = map.get(i);
+
+            assert val != null;
+            assert val.equals(Integer.toString(i));
+        }
+
+        store.resetLastMethod();
+
+        tx = grid().transactions().txStart();
+
+        try {
+            for (int i = 1; i <= 10; i++) {
+                String val = cache.getAndRemove(i);
+
+                checkLastMethod(null);
+
+                assert val != null;
+                assert val.equals(Integer.toString(i));
+            }
+
+            tx.commit();
+        }
+        finally {
+            tx.close();
+        }
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        checkLastMethod("removeAll");
+
+        assert map.isEmpty();
+    }
+
+    /** @throws Exception If test failed. */
+    public void testReadThrough() throws Exception {
+        IgniteCache<Integer, String> cache = jcache();
+
+        Map<Integer, String> map = store.getMap();
+
+        assert map.isEmpty();
+
+        try (Transaction tx = grid().transactions().txStart(OPTIMISTIC, 
REPEATABLE_READ)) {
+            for (int i = 1; i <= 10; i++)
+                cache.put(i, Integer.toString(i));
+
+            checkLastMethod(null);
+
+            tx.commit();
+        }
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        checkLastMethod("putAll");
+
+        for (int i = 1; i <= 10; i++) {
+            String val = map.get(i);
+
+            assert val != null;
+            assert val.equals(Integer.toString(i));
+        }
+
+        cache.clear();
+
+        assert cache.localSize() == 0;
+        assert cache.localSize() == 0;
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        assert map.size() == 10;
+
+        for (int i = 1; i <= 10; i++) {
+            // Read through.
+            String val = cache.get(i);
+
+            checkLastMethod("load");
+
+            assert val != null;
+            assert val.equals(Integer.toString(i));
+        }
+
+        assert cache.size() == 10;
+
+        cache.clear();
+
+        assert cache.localSize() == 0;
+        assert cache.localSize() == 0;
+
+        assert map.size() == 10;
+
+        Set<Integer> keys = new HashSet<>();
+
+        for (int i = 1; i <= 10; i++)
+            keys.add(i);
+
+        // Read through.
+        Map<Integer, String> vals = cache.getAll(keys);
+
+        checkLastMethod("loadAll");
+
+        assert vals != null;
+        assert vals.size() == 10;
+
+        for (int i = 1; i <= 10; i++) {
+            String val = vals.get(i);
+
+            assert val != null;
+            assert val.equals(Integer.toString(i));
+        }
+
+        // Write through.
+        cache.removeAll(keys);
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        checkLastMethod("removeAll");
+
+        assert cache.localSize() == 0;
+        assert cache.localSize() == 0;
+
+        assert map.isEmpty();
+    }
+
+    /** @throws Exception If failed. */
+    public void testMultithreaded() throws Exception {
+        final ConcurrentMap<String, Set<Integer>> perThread = new 
ConcurrentHashMap<>();
+
+        final AtomicBoolean running = new AtomicBoolean(true);
+
+        final IgniteCache<Integer, String> cache = jcache();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+            @SuppressWarnings({"NullableProblems"})
+            @Override public void run() {
+                // Initialize key set for this thread.
+                Set<Integer> set = new HashSet<>();
+
+                Set<Integer> old = 
perThread.putIfAbsent(Thread.currentThread().getName(), set);
+
+                if (old != null)
+                    set = old;
+
+                Random rnd = new Random();
+
+                int keyCnt = 20000;
+
+                while (running.get()) {
+                    int op = rnd.nextInt(2);
+                    int key = rnd.nextInt(keyCnt);
+
+                    switch (op) {
+                        case 0:
+                            cache.put(key, "val" + key);
+                            set.add(key);
+
+                            break;
+
+                        case 1:
+                        default:
+                            cache.remove(key);
+                            set.remove(key);
+
+                            break;
+                    }
+                }
+            }
+        }, 10, "put");
+
+        U.sleep(10000);
+
+        running.set(false);
+
+        fut.get();
+
+        U.sleep(5 * WRITE_FROM_BEHIND_FLUSH_FREQUENCY);
+
+        Map<Integer, String> stored = store.getMap();
+
+        for (Map.Entry<Integer, String> entry : stored.entrySet()) {
+            int key = entry.getKey();
+
+            assertEquals("Invalid value for key " + key, "val" + key, 
entry.getValue());
+
+            boolean found = false;
+
+            for (Set<Integer> threadPuts : perThread.values()) {
+                if (threadPuts.contains(key)) {
+                    found = true;
+
+                    break;
+                }
+            }
+
+            assert found : "No threads found that put key " + key;
+        }
+    }
+
+    /** @param mtd Expected last method value. */
+    private void checkLastMethod(@Nullable String mtd) {
+        String lastMtd = store.getLastMethod();
+
+        if (mtd == null)
+            assert lastMtd == null : "Last method must be null: " + lastMtd;
+        else {
+            assert lastMtd != null : "Last method must be not null";
+            assert lastMtd.equals(mtd) : "Last method does not match 
[expected=" + mtd + ", lastMtd=" + lastMtd + ']';
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreLocalTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreLocalTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreLocalTest.java
new file mode 100644
index 0000000..2325fa6
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreLocalTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.cache.*;
+
+/**
+ * Tests {@link 
org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore} in 
grid configuration.
+ */
+public class GridCacheWriteBehindStoreLocalTest extends 
GridCacheWriteBehindStoreAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.LOCAL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
new file mode 100644
index 0000000..3bcebb0
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Multithreaded tests for {@link 
org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}.
+ */
+public class GridCacheWriteBehindStoreMultithreadedSelfTest extends 
GridCacheWriteBehindStoreAbstractSelfTest {
+    /**
+     * This test performs complex set of operations on store from multiple 
threads.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPutGetRemove() throws Exception {
+        initStore(2);
+
+        Set<Integer> exp;
+
+        try {
+            exp = runPutGetRemoveMultithreaded(10, 10);
+        }
+        finally {
+            shutdownStore();
+        }
+
+        Map<Integer, String> map = delegate.getMap();
+
+        Collection<Integer> extra = new HashSet<>(map.keySet());
+
+        extra.removeAll(exp);
+
+        assertTrue("The underlying store contains extra keys: " + extra, 
extra.isEmpty());
+
+        Collection<Integer> missing = new HashSet<>(exp);
+
+        missing.removeAll(map.keySet());
+
+        assertTrue("Missing keys in the underlying store: " + missing, 
missing.isEmpty());
+
+        for (Integer key : exp)
+            assertEquals("Invalid value for key " + key, "val" + key, 
map.get(key));
+    }
+
+    /**
+     * Tests that cache would keep values if underlying store fails.
+     *
+     * @throws Exception If failed.
+     */
+    public void testStoreFailure() throws Exception {
+        delegate.setShouldFail(true);
+
+        initStore(2);
+
+        Set<Integer> exp;
+
+        try {
+            exp = runPutGetRemoveMultithreaded(10, 10);
+
+            U.sleep(FLUSH_FREQUENCY);
+
+            info(">>> There are " + store.getWriteBehindErrorRetryCount() + " 
entries in RETRY state");
+
+            delegate.setShouldFail(false);
+
+            // Despite that we set shouldFail flag to false, flush thread may 
just have caught an exception.
+            // If we move store to the stopping state right away, this value 
will be lost. That's why this sleep
+            // is inserted here to let all exception handlers in write-behind 
store exit.
+            U.sleep(1000);
+        }
+        finally {
+            shutdownStore();
+        }
+
+        Map<Integer, String> map = delegate.getMap();
+
+        Collection<Integer> extra = new HashSet<>(map.keySet());
+
+        extra.removeAll(exp);
+
+        assertTrue("The underlying store contains extra keys: " + extra, 
extra.isEmpty());
+
+        Collection<Integer> missing = new HashSet<>(exp);
+
+        missing.removeAll(map.keySet());
+
+        assertTrue("Missing keys in the underlying store: " + missing, 
missing.isEmpty());
+
+        for (Integer key : exp)
+            assertEquals("Invalid value for key " + key, "val" + key, 
map.get(key));
+    }
+
+    /**
+     * Tests store consistency in case of high put rate, when flush is 
performed from the same thread
+     * as put or remove operation.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFlushFromTheSameThread() throws Exception {
+        // 50 milliseconds should be enough.
+        delegate.setOperationDelay(50);
+
+        initStore(2);
+
+        Set<Integer> exp;
+
+        int start = store.getWriteBehindTotalCriticalOverflowCount();
+
+        try {
+            //We will have in total 5 * CACHE_SIZE keys that should be enough 
to grow map size to critical value.
+            exp = runPutGetRemoveMultithreaded(5, CACHE_SIZE);
+        }
+        finally {
+            log.info(">>> Done inserting, shutting down the store");
+
+            shutdownStore();
+        }
+
+        // Restore delay.
+        delegate.setOperationDelay(0);
+
+        Map<Integer, String> map = delegate.getMap();
+
+        int end = store.getWriteBehindTotalCriticalOverflowCount();
+
+        log.info(">>> There are " + exp.size() + " keys in store, " + (end - 
start) + " overflows detected");
+
+        assertTrue("No cache overflows detected (a bug or too few keys or too 
few delay?)", end > start);
+
+        Collection<Integer> extra = new HashSet<>(map.keySet());
+
+        extra.removeAll(exp);
+
+        assertTrue("The underlying store contains extra keys: " + extra, 
extra.isEmpty());
+
+        Collection<Integer> missing = new HashSet<>(exp);
+
+        missing.removeAll(map.keySet());
+
+        assertTrue("Missing keys in the underlying store: " + missing, 
missing.isEmpty());
+
+        for (Integer key : exp)
+            assertEquals("Invalid value for key " + key, "val" + key, 
map.get(key));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
new file mode 100644
index 0000000..e9821fb
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ * Tests write-behind store with near and dht commit option.
+ */
+public class GridCacheWriteBehindStorePartitionedMultiNodeSelfTest extends 
GridCommonAbstractTest {
+    /** Grids to start. */
+    private static final int GRID_CNT = 5;
+
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Flush frequency. */
+    public static final int WRITE_BEHIND_FLUSH_FREQ = 1000;
+
+    /** Stores per grid. */
+    private GridCacheTestStore[] stores = new GridCacheTestStore[GRID_CNT];
+
+    /** Start grid counter. */
+    private int idx;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(CacheMode.PARTITIONED);
+        cc.setWriteBehindEnabled(true);
+        cc.setWriteBehindFlushFrequency(WRITE_BEHIND_FLUSH_FREQ);
+        cc.setAtomicityMode(TRANSACTIONAL);
+        cc.setNearConfiguration(new NearCacheConfiguration());
+
+        CacheStore store = stores[idx] = new GridCacheTestStore();
+
+        cc.setCacheStoreFactory(singletonFactory(store));
+        cc.setReadThrough(true);
+        cc.setWriteThrough(true);
+        cc.setLoadPreviousValue(true);
+
+        c.setCacheConfiguration(cc);
+
+        idx++;
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stores = null;
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void prepare() throws Exception {
+        idx = 0;
+
+        startGrids(GRID_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleWritesOnDhtNode() throws Exception {
+        checkSingleWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBatchWritesOnDhtNode() throws Exception {
+        checkBatchWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxWritesOnDhtNode() throws Exception {
+        checkTxWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkSingleWrites() throws Exception {
+        prepare();
+
+        IgniteCache<Integer, String> cache = grid(0).cache(null);
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, String.valueOf(i));
+
+        checkWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkBatchWrites() throws Exception {
+        prepare();
+
+        Map<Integer, String> map = new HashMap<>();
+
+        for (int i = 0; i < 100; i++)
+            map.put(i, String.valueOf(i));
+
+        grid(0).cache(null).putAll(map);
+
+        checkWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkTxWrites() throws Exception {
+        prepare();
+
+        IgniteCache<Object, Object> cache = grid(0).cache(null);
+
+        try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+            for (int i = 0; i < 100; i++)
+                cache.put(i, String.valueOf(i));
+
+            tx.commit();
+        }
+
+        checkWrites();
+    }
+
+    /**
+     * @throws IgniteInterruptedCheckedException If sleep was interrupted.
+     */
+    private void checkWrites() throws IgniteInterruptedCheckedException {
+        U.sleep(WRITE_BEHIND_FLUSH_FREQ * 2);
+
+        Collection<Integer> allKeys = new ArrayList<>(100);
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            Map<Integer,String> map = stores[i].getMap();
+
+            assertFalse("Missing writes for node: " + i, map.isEmpty());
+
+            allKeys.addAll(map.keySet());
+
+            // Check there is no intersection.
+            for (int j = 0; j < GRID_CNT; j++) {
+                if (i == j)
+                    continue;
+
+                Collection<Integer> intersection = new 
HashSet<>(stores[j].getMap().keySet());
+
+                intersection.retainAll(map.keySet());
+
+                assertTrue(intersection.isEmpty());
+            }
+        }
+
+        assertEquals(100, allKeys.size());
+
+        for (int i = 0; i < 100; i++)
+            assertTrue(allKeys.contains(i));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedTest.java
new file mode 100644
index 0000000..fe589ca
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.cache.*;
+
+/**
+ * Tests {@link 
org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore} in 
partitioned configuration.
+ */
+public class GridCacheWriteBehindStorePartitionedTest extends 
GridCacheWriteBehindStoreAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreReplicatedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreReplicatedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreReplicatedTest.java
new file mode 100644
index 0000000..26f8431
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreReplicatedTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.cache.*;
+
+/**
+ * Tests {@link 
org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore} in 
grid configuration.
+ */
+public class GridCacheWriteBehindStoreReplicatedTest extends 
GridCacheWriteBehindStoreAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
new file mode 100644
index 0000000..937e597
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * This class provides basic tests for {@link 
org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}.
+ */
+public class GridCacheWriteBehindStoreSelfTest extends 
GridCacheWriteBehindStoreAbstractSelfTest {
+    /**
+     * Tests correct store shutdown when underlying store fails,
+     *
+     * @throws Exception If failed.
+     */
+    public void testShutdownWithFailure() throws Exception {
+        final AtomicReference<Exception> err = new AtomicReference<>();
+
+        multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    delegate.setShouldFail(true);
+
+                    initStore(2);
+
+                    try {
+                        store.write(new CacheEntryImpl<>(1, "val1"));
+                        store.write(new CacheEntryImpl<>(2, "val2"));
+                    }
+                    finally {
+                        shutdownStore();
+
+                        delegate.setShouldFail(false);
+                    }
+                }
+                catch (Exception e) {
+                    err.set(e);
+                }
+            }
+        }, 1).get();
+
+        if (err.get() != null)
+            throw err.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimpleStore() throws Exception {
+        initStore(2);
+
+        try {
+            store.write(new CacheEntryImpl<>(1, "v1"));
+            store.write(new CacheEntryImpl<>(2, "v2"));
+
+            assertEquals("v1", store.load(1));
+            assertEquals("v2", store.load(2));
+            assertNull(store.load(3));
+
+            store.delete(1);
+
+            assertNull(store.load(1));
+            assertEquals("v2", store.load(2));
+            assertNull(store.load(3));
+        }
+        finally {
+            shutdownStore();
+        }
+    }
+
+    /**
+     * Check that all values written to the store will be in underlying store 
after timeout or due to size limits.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"NullableProblems"})
+    public void testValuePropagation() throws Exception {
+        // Need to test size-based write.
+        initStore(1);
+
+        try {
+            for (int i = 0; i < CACHE_SIZE * 2; i++)
+                store.write(new CacheEntryImpl<>(i, "val" + i));
+
+            U.sleep(200);
+
+            for (int i = 0; i < CACHE_SIZE; i++) {
+                String val = delegate.load(i);
+
+                assertNotNull("Value for [key= " + i + "] was not written in 
store", val);
+                assertEquals("Invalid value [key=" + i + "]", "val" + i, val);
+            }
+
+            U.sleep(FLUSH_FREQUENCY + 300);
+
+            for (int i = CACHE_SIZE; i < CACHE_SIZE * 2; i++) {
+                String val = delegate.load(i);
+
+                assertNotNull("Value for [key= " + i + "] was not written in 
store", val);
+                assertEquals("Invalid value [key=" + i + "]", "val" + i, val);
+            }
+        }
+        finally {
+            shutdownStore();
+        }
+    }
+
+    /**
+     * Tests store behaviour under continuous put of the same key with 
different values.
+     *
+     * @throws Exception If failed
+     */
+    public void testContinuousPut() throws Exception {
+        initStore(2);
+
+        try {
+            final AtomicBoolean running = new AtomicBoolean(true);
+
+            final AtomicInteger actualPutCnt = new AtomicInteger();
+
+            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+                @SuppressWarnings({"NullableProblems"})
+                @Override public void run() {
+                    try {
+                        while (running.get()) {
+                            for (int i = 0; i < CACHE_SIZE; i++) {
+                                store.write(new CacheEntryImpl<>(i, "val-0"));
+
+                                actualPutCnt.incrementAndGet();
+
+                                store.write(new CacheEntryImpl<>(i, "val" + 
i));
+
+                                actualPutCnt.incrementAndGet();
+                            }
+                        }
+                    }
+                    catch (Exception e) {
+                        error("Unexpected exception in put thread", e);
+
+                        assert false;
+                    }
+                }
+            }, 1, "put");
+
+            U.sleep(FLUSH_FREQUENCY * 2 + 500);
+
+            int delegatePutCnt = delegate.getPutAllCount();
+
+            running.set(false);
+
+            fut.get();
+
+            log().info(">>> [putCnt = " + actualPutCnt.get() + ", 
delegatePutCnt=" + delegatePutCnt + "]");
+
+            assertTrue("No puts were made to the underlying store", 
delegatePutCnt > 0);
+            assertTrue("Too many puts were made to the underlying store", 
delegatePutCnt < actualPutCnt.get() / 10);
+        }
+        finally {
+            shutdownStore();
+        }
+
+        // These checks must be done after the store shut down
+        assertEquals("Invalid store size", CACHE_SIZE, 
delegate.getMap().size());
+
+        for (int i = 0; i < CACHE_SIZE; i++)
+            assertEquals("Invalid value stored", "val" + i, 
delegate.getMap().get(i));
+    }
+
+    /**
+     * Tests that all values were put into the store will be written to the 
underlying store
+     * after shutdown is called.
+     *
+     * @throws Exception If failed.
+     */
+    public void testShutdown() throws Exception {
+        initStore(2);
+
+        try {
+            final AtomicBoolean running = new AtomicBoolean(true);
+
+            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+                @SuppressWarnings({"NullableProblems"})
+                @Override public void run() {
+                    try {
+                        while (running.get()) {
+                            for (int i = 0; i < CACHE_SIZE; i++) {
+                                store.write(new CacheEntryImpl<>(i, "val-0"));
+
+                                store.write(new CacheEntryImpl<>(i, "val" + 
i));
+                            }
+                        }
+                    }
+                    catch (Exception e) {
+                        error("Unexpected exception in put thread", e);
+
+                        assert false;
+                    }
+                }
+            }, 1, "put");
+
+            U.sleep(300);
+
+            running.set(false);
+
+            fut.get();
+        }
+        finally {
+            shutdownStore();
+        }
+
+        // These checks must be done after the store shut down
+        assertEquals("Invalid store size", CACHE_SIZE, 
delegate.getMap().size());
+
+        for (int i = 0; i < CACHE_SIZE; i++)
+            assertEquals("Invalid value stored", "val" + i, 
delegate.getMap().get(i));
+    }
+
+    /**
+     * Tests that all values will be written to the underlying store
+     * right in the same order as they were put into the store.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBatchApply() throws Exception {
+        delegate = new GridCacheTestStore(new ConcurrentLinkedHashMap<Integer, 
String>());
+
+        initStore(1);
+
+        List<Integer> intList = new ArrayList<>(CACHE_SIZE);
+
+        try {
+            for (int i = 0; i < CACHE_SIZE; i++) {
+                store.write(new CacheEntryImpl<>(i, "val" + i));
+
+                intList.add(i);
+            }
+        }
+        finally {
+            shutdownStore();
+        }
+
+        Map<Integer, String> underlyingMap = delegate.getMap();
+
+        assertTrue("Store map key set: " + underlyingMap.keySet(), 
F.eqOrdered(underlyingMap.keySet(), intList));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
 
b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 20a9caf..41f23ff 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.dr.*;
 import org.apache.ignite.internal.processors.cache.jta.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.cache.query.continuous.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.plugin.*;
@@ -59,7 +60,7 @@ public class GridCacheTestContext<K, V> extends 
GridCacheContext<K, V> {
             true,
             new GridCacheEventManager(),
             new GridCacheSwapManager(false),
-            new GridCacheStoreManager(null,
+            new GridCacheOsStoreManager(null,
                 new IdentityHashMap<CacheStore, ThreadLocal>(),
                 null,
                 new CacheConfiguration()),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
index 452dbf1..529b227 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
@@ -19,9 +19,10 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 
 /**
- * Test suite that contains all tests for {@link GridCacheWriteBehindStore}.
+ * Test suite that contains all tests for {@link 
org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}.
  */
 public class IgniteCacheWriteBehindTestSuite extends TestSuite {
     /**

Reply via email to