# GG-9973: WIP.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/29377e9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/29377e9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/29377e9e

Branch: refs/heads/ignite-gg-9973
Commit: 29377e9e8417b1913e075460368d7ca022bec1c8
Parents: 9b23109
Author: vozerov-gridgain <voze...@gridgain.com>
Authored: Thu Apr 2 11:44:32 2015 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Thu Apr 2 11:44:32 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheMetricsImpl.java      |    1 +
 .../cache/CacheStorePartialUpdateException.java |    2 +-
 .../processors/cache/GridCacheContext.java      |    7 +-
 .../processors/cache/GridCacheProcessor.java    |    3 +-
 .../processors/cache/GridCacheStoreManager.java | 1202 ------------------
 .../cache/GridCacheWriteBehindStore.java        | 1015 ---------------
 .../cache/store/GridCacheOsStoreManager.java    | 1202 ++++++++++++++++++
 .../cache/store/GridCacheWriteBehindStore.java  | 1015 +++++++++++++++
 .../cache/transactions/IgniteTxAdapter.java     |    5 +-
 .../transactions/IgniteTxLocalAdapter.java      |    9 +-
 .../resources/META-INF/classnames.properties    |   14 +-
 .../cache/GridCachePartitionedWritesTest.java   |    3 +-
 ...idCacheWriteBehindStoreAbstractSelfTest.java |  189 ---
 .../GridCacheWriteBehindStoreAbstractTest.java  |  349 -----
 .../GridCacheWriteBehindStoreLocalTest.java     |   30 -
 ...heWriteBehindStoreMultithreadedSelfTest.java |  163 ---
 ...BehindStorePartitionedMultiNodeSelfTest.java |  215 ----
 ...ridCacheWriteBehindStorePartitionedTest.java |   30 -
 ...GridCacheWriteBehindStoreReplicatedTest.java |   30 -
 .../GridCacheWriteBehindStoreSelfTest.java      |  267 ----
 ...idCacheWriteBehindStoreAbstractSelfTest.java |  191 +++
 .../GridCacheWriteBehindStoreAbstractTest.java  |  350 +++++
 .../GridCacheWriteBehindStoreLocalTest.java     |   30 +
 ...heWriteBehindStoreMultithreadedSelfTest.java |  163 +++
 ...BehindStorePartitionedMultiNodeSelfTest.java |  216 ++++
 ...ridCacheWriteBehindStorePartitionedTest.java |   30 +
 ...GridCacheWriteBehindStoreReplicatedTest.java |   30 +
 .../GridCacheWriteBehindStoreSelfTest.java      |  268 ++++
 .../loadtests/hashmap/GridCacheTestContext.java |    3 +-
 .../IgniteCacheWriteBehindTestSuite.java        |    3 +-
 30 files changed, 3523 insertions(+), 3512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 446070e..deebab4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStorePartialUpdateException.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStorePartialUpdateException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStorePartialUpdateException.java
index 6a364da..f7fca59 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStorePartialUpdateException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStorePartialUpdateException.java
@@ -36,7 +36,7 @@ public class CacheStorePartialUpdateException extends 
IgniteCheckedException {
      * @param cause Cause.
      */
     @SuppressWarnings("unchecked")
-    CacheStorePartialUpdateException(Collection failedKeys, Exception cause) {
+    public CacheStorePartialUpdateException(Collection failedKeys, Exception 
cause) {
         super(cause);
 
         this.failedKeys = failedKeys;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index e7f3ad4..28ec91f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.jta.*;
 import org.apache.ignite.internal.processors.cache.local.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.cache.query.continuous.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.cacheobject.*;
@@ -127,7 +128,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     private GridCacheTtlManager ttlMgr;
 
     /** Store manager. */
-    private GridCacheStoreManager storeMgr;
+    private GridCacheOsStoreManager storeMgr;
 
     /** Replication manager. */
     private GridCacheDrManager drMgr;
@@ -233,7 +234,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
 
         GridCacheEventManager evtMgr,
         GridCacheSwapManager swapMgr,
-        GridCacheStoreManager storeMgr,
+        GridCacheOsStoreManager storeMgr,
         GridCacheEvictionManager evictMgr,
         GridCacheQueryManager<K, V> qryMgr,
         CacheContinuousQueryManager contQryMgr,
@@ -951,7 +952,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     /**
      * @return Store manager.
      */
-    public GridCacheStoreManager store() {
+    public GridCacheOsStoreManager store() {
         return storeMgr;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 00852fc..5761bdb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.local.*;
 import org.apache.ignite.internal.processors.cache.local.atomic.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.cache.query.continuous.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.plugin.*;
@@ -1023,7 +1024,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         CacheConflictResolutionManager rslvrMgr = 
pluginMgr.createComponent(CacheConflictResolutionManager.class);
         GridCacheDrManager drMgr = 
pluginMgr.createComponent(GridCacheDrManager.class);
 
-        GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, 
sesHolders, cfgStore, cfg);
+        GridCacheOsStoreManager storeMgr = new GridCacheOsStoreManager(ctx, 
sesHolders, cfgStore, cfg);
 
         GridCacheContext<?, ?> cacheCtx = new GridCacheContext(
             ctx,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java
deleted file mode 100644
index d30aa06..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java
+++ /dev/null
@@ -1,1202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.store.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.lifecycle.*;
-import org.apache.ignite.transactions.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import javax.cache.integration.*;
-import java.util.*;
-
-/**
- * Store manager.
- */
-@SuppressWarnings("AssignmentToCatchBlockParameter")
-public class GridCacheStoreManager extends GridCacheManagerAdapter {
-    /** */
-    private static final UUID SES_ATTR = UUID.randomUUID();
-
-    /** */
-    private final CacheStore<Object, Object> store;
-
-    /** */
-    private final CacheStore<?, ?> cfgStore;
-
-    /** */
-    private final CacheStoreBalancingWrapper<Object, Object> singleThreadGate;
-
-    /** */
-    private final ThreadLocal<SessionData> sesHolder;
-
-    /** */
-    private final boolean locStore;
-
-    /** */
-    private final boolean writeThrough;
-
-    /** */
-    private boolean convertPortable;
-
-    /**
-     * @param ctx Kernal context.
-     * @param sesHolders Session holders map to use the same session holder 
for different managers if they use
-     *        the same store instance.
-     * @param cfgStore Store provided in configuration.
-     * @param cfg Cache configuration.
-     * @throws IgniteCheckedException In case of error.
-     */
-    @SuppressWarnings("unchecked")
-    public GridCacheStoreManager(
-        GridKernalContext ctx,
-        Map<CacheStore, ThreadLocal> sesHolders,
-        @Nullable CacheStore<Object, Object> cfgStore,
-        CacheConfiguration cfg
-    ) throws IgniteCheckedException {
-        this.cfgStore = cfgStore;
-
-        store = cacheStoreWrapper(ctx, cfgStore, cfg);
-
-        singleThreadGate = store == null ? null : new 
CacheStoreBalancingWrapper<>(store);
-
-        writeThrough = cfg.isWriteThrough();
-
-        ThreadLocal<SessionData> sesHolder0 = null;
-
-        if (cfgStore != null) {
-            sesHolder0 = sesHolders.get(cfgStore);
-
-            if (sesHolder0 == null) {
-                ThreadLocalSession locSes = new ThreadLocalSession();
-
-                if (ctx.resource().injectStoreSession(cfgStore, locSes)) {
-                    sesHolder0 = locSes.sesHolder;
-
-                    sesHolders.put(cfgStore, sesHolder0);
-                }
-            }
-        }
-
-        sesHolder = sesHolder0;
-
-        locStore = U.hasAnnotation(cfgStore, CacheLocalStore.class);
-    }
-
-    /**
-     * @return {@code True} is write-through is enabled.
-     */
-    public boolean writeThrough() {
-        return writeThrough;
-    }
-
-    /**
-     * @return Unwrapped store provided in configuration.
-     */
-    public CacheStore<?, ?> configuredStore() {
-        return cfgStore;
-    }
-
-    /**
-     * Creates a wrapped cache store if write-behind cache is configured.
-     *
-     * @param ctx Kernal context.
-     * @param cfgStore Store provided in configuration.
-     * @param cfg Cache configuration.
-     * @return Instance if {@link GridCacheWriteBehindStore} if write-behind 
store is configured,
-     *         or user-defined cache store.
-     */
-    @SuppressWarnings({"unchecked"})
-    private CacheStore cacheStoreWrapper(GridKernalContext ctx,
-        @Nullable CacheStore cfgStore,
-        CacheConfiguration cfg) {
-        if (cfgStore == null || !cfg.isWriteBehindEnabled())
-            return cfgStore;
-
-        GridCacheWriteBehindStore store = new GridCacheWriteBehindStore(this,
-            ctx.gridName(),
-            cfg.getName(),
-            ctx.log(GridCacheWriteBehindStore.class),
-            cfgStore);
-
-        store.setFlushSize(cfg.getWriteBehindFlushSize());
-        store.setFlushThreadCount(cfg.getWriteBehindFlushThreadCount());
-        store.setFlushFrequency(cfg.getWriteBehindFlushFrequency());
-        store.setBatchSize(cfg.getWriteBehindBatchSize());
-
-        return store;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void start0() throws IgniteCheckedException {
-        if (store instanceof LifecycleAware) {
-            try {
-                // Avoid second start() call on store in case when near cache 
is enabled.
-                if (cctx.config().isWriteBehindEnabled()) {
-                    if (!cctx.isNear())
-                        ((LifecycleAware)store).start();
-                }
-            }
-            catch (Exception e) {
-                throw new IgniteCheckedException("Failed to start cache store: 
" + e, e);
-            }
-        }
-
-        convertPortable = 
!cctx.cacheObjects().keepPortableInStore(cctx.name());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void stop0(boolean cancel) {
-        if (store instanceof LifecycleAware) {
-            try {
-                // Avoid second start() call on store in case when near cache 
is enabled.
-                if (cctx.config().isWriteBehindEnabled()) {
-                    if (!cctx.isNear())
-                        ((LifecycleAware)store).stop();
-                }
-            }
-            catch (Exception e) {
-                U.error(log(), "Failed to stop cache store.", e);
-            }
-        }
-    }
-
-    /**
-     * @return Convert-portable flag.
-     */
-    public boolean convertPortable() {
-        return convertPortable;
-    }
-
-    /**
-     * @param convertPortable Convert-portable flag.
-     */
-    public void convertPortable(boolean convertPortable) {
-        this.convertPortable = convertPortable;
-    }
-
-    /**
-     * @return {@code true} If local store is configured.
-     */
-    public boolean isLocalStore() {
-        return locStore;
-    }
-
-    /**
-     * @return {@code true} If store configured.
-     */
-    public boolean configured() {
-        return store != null;
-    }
-
-    /**
-     * Loads data from persistent store.
-     *
-     * @param tx Cache transaction.
-     * @param key Cache key.
-     * @return Loaded value, possibly <tt>null</tt>.
-     * @throws IgniteCheckedException If data loading failed.
-     */
-    @SuppressWarnings("unchecked")
-    @Nullable public Object loadFromStore(@Nullable IgniteInternalTx tx, 
KeyCacheObject key)
-        throws IgniteCheckedException {
-        return loadFromStore(tx, key, true);
-    }
-
-    /**
-     * Loads data from persistent store.
-     *
-     * @param tx Cache transaction.
-     * @param key Cache key.
-     * @param convert Convert flag.
-     * @return Loaded value, possibly <tt>null</tt>.
-     * @throws IgniteCheckedException If data loading failed.
-     */
-    @SuppressWarnings("unchecked")
-    @Nullable private Object loadFromStore(@Nullable IgniteInternalTx tx,
-        KeyCacheObject key,
-        boolean convert)
-        throws IgniteCheckedException {
-        if (store != null) {
-            if (key.internal())
-                // Never load internal keys from store as they are never 
persisted.
-                return null;
-
-            Object storeKey = key.value(cctx.cacheObjectContext(), false);
-
-            if (convertPortable)
-                storeKey = cctx.unwrapPortableIfNeeded(storeKey, false);
-
-            if (log.isDebugEnabled())
-                log.debug("Loading value from store for key: " + storeKey);
-
-            initSession(tx);
-
-            boolean thewEx = true;
-
-            Object val = null;
-
-            try {
-                val = singleThreadGate.load(storeKey);
-
-                thewEx = false;
-            }
-            catch (ClassCastException e) {
-                handleClassCastException(e);
-            }
-            catch (CacheLoaderException e) {
-                throw new IgniteCheckedException(e);
-            }
-            catch (Exception e) {
-                throw new IgniteCheckedException(new CacheLoaderException(e));
-            }
-            finally {
-                endSession(tx, thewEx);
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Loaded value from store [key=" + key + ", val=" + 
val + ']');
-
-            if (convert) {
-                val = convert(val);
-
-                return val;
-            }
-            else
-                return val;
-        }
-
-        return null;
-    }
-
-    /**
-     * @param val Internal value.
-     * @return User value.
-     */
-    @SuppressWarnings("unchecked")
-    private Object convert(Object val) {
-        if (val == null)
-            return null;
-
-        return locStore ? ((IgniteBiTuple<Object, 
GridCacheVersion>)val).get1() : val;
-    }
-
-    /**
-     * @return Whether DHT transaction can write to store from DHT.
-     */
-    public boolean writeToStoreFromDht() {
-        return cctx.config().isWriteBehindEnabled() || locStore;
-    }
-
-    /**
-     * @param tx Cache transaction.
-     * @param keys Cache keys.
-     * @param vis Closure to apply for loaded elements.
-     * @throws IgniteCheckedException If data loading failed.
-     */
-    public void localStoreLoadAll(@Nullable IgniteInternalTx tx,
-        Collection<? extends KeyCacheObject> keys,
-        final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis)
-        throws IgniteCheckedException {
-        assert store != null;
-        assert locStore;
-
-        loadAllFromStore(tx, keys, null, vis);
-    }
-
-    /**
-     * Loads data from persistent store.
-     *
-     * @param tx Cache transaction.
-     * @param keys Cache keys.
-     * @param vis Closure.
-     * @return {@code True} if there is a persistent storage.
-     * @throws IgniteCheckedException If data loading failed.
-     */
-    @SuppressWarnings({"unchecked"})
-    public boolean loadAllFromStore(@Nullable IgniteInternalTx tx,
-        Collection<? extends KeyCacheObject> keys,
-        final IgniteBiInClosure<KeyCacheObject, Object> vis) throws 
IgniteCheckedException {
-        if (store != null) {
-            loadAllFromStore(tx, keys, vis, null);
-
-            return true;
-        }
-        else {
-            for (KeyCacheObject key : keys)
-                vis.apply(key, null);
-        }
-
-        return false;
-    }
-
-    /**
-     * @param tx Cache transaction.
-     * @param keys Keys to load.
-     * @param vis Key/value closure (only one of vis or verVis can be 
specified).
-     * @param verVis Key/value/version closure (only one of vis or verVis can 
be specified).
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    private void loadAllFromStore(@Nullable IgniteInternalTx tx,
-        Collection<? extends KeyCacheObject> keys,
-        @Nullable final IgniteBiInClosure<KeyCacheObject, Object> vis,
-        @Nullable final GridInClosure3<KeyCacheObject, Object, 
GridCacheVersion> verVis)
-        throws IgniteCheckedException {
-        assert vis != null ^ verVis != null;
-        assert verVis == null || locStore;
-
-        final boolean convert = verVis == null;
-
-        if (!keys.isEmpty()) {
-            if (keys.size() == 1) {
-                KeyCacheObject key = F.first(keys);
-
-                if (convert)
-                    vis.apply(key, loadFromStore(tx, key));
-                else {
-                    IgniteBiTuple<Object, GridCacheVersion> t =
-                        (IgniteBiTuple<Object, 
GridCacheVersion>)loadFromStore(tx, key, false);
-
-                    if (t != null)
-                        verVis.apply(key, t.get1(), t.get2());
-                }
-
-                return;
-            }
-
-            Collection<Object> keys0;
-
-            if (convertPortable) {
-                keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() {
-                    @Override public Object apply(KeyCacheObject key) {
-                        return 
cctx.unwrapPortableIfNeeded(key.value(cctx.cacheObjectContext(), false), false);
-                    }
-                });
-            }
-            else {
-                keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() {
-                    @Override public Object apply(KeyCacheObject key) {
-                        return key.value(cctx.cacheObjectContext(), false);
-                    }
-                });
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Loading values from store for keys: " + keys0);
-
-            initSession(tx);
-
-            boolean thewEx = true;
-
-            try {
-                IgniteBiInClosure<Object, Object> c = new CI2<Object, 
Object>() {
-                    @SuppressWarnings("ConstantConditions")
-                    @Override public void apply(Object k, Object val) {
-                        if (convert) {
-                            Object v = convert(val);
-
-                            vis.apply(cctx.toCacheKeyObject(k), v);
-                        }
-                        else {
-                            IgniteBiTuple<Object, GridCacheVersion> v = 
(IgniteBiTuple<Object, GridCacheVersion>)val;
-
-                            if (v != null)
-                                verVis.apply(cctx.toCacheKeyObject(k), 
v.get1(), v.get2());
-                        }
-                    }
-                };
-
-                if (keys.size() > singleThreadGate.loadAllThreshold()) {
-                    Map<Object, Object> map = store.loadAll(keys0);
-
-                    if (map != null) {
-                        for (Map.Entry<Object, Object> e : map.entrySet())
-                            c.apply(cctx.toCacheKeyObject(e.getKey()), 
e.getValue());
-                    }
-                }
-                else
-                    singleThreadGate.loadAll(keys0, c);
-
-                thewEx = false;
-            }
-            catch (ClassCastException e) {
-                handleClassCastException(e);
-            }
-            catch (CacheLoaderException e) {
-                throw new IgniteCheckedException(e);
-            }
-            catch (Exception e) {
-                throw new IgniteCheckedException(new CacheLoaderException(e));
-            }
-            finally {
-                endSession(tx, thewEx);
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Loaded values from store for keys: " + keys0);
-        }
-    }
-
-    /**
-     * Loads data from persistent store.
-     *
-     * @param vis Closer to cache loaded elements.
-     * @param args User arguments.
-     * @return {@code True} if there is a persistent storage.
-     * @throws IgniteCheckedException If data loading failed.
-     */
-    @SuppressWarnings({"ErrorNotRethrown", "unchecked"})
-    public boolean loadCache(final GridInClosure3<KeyCacheObject, Object, 
GridCacheVersion> vis, Object[] args)
-        throws IgniteCheckedException {
-        if (store != null) {
-            if (log.isDebugEnabled())
-                log.debug("Loading all values from store.");
-
-            initSession(null);
-
-            boolean thewEx = true;
-
-            try {
-                store.loadCache(new IgniteBiInClosure<Object, Object>() {
-                    @Override public void apply(Object k, Object o) {
-                        Object v;
-                        GridCacheVersion ver = null;
-
-                        if (locStore) {
-                            IgniteBiTuple<Object, GridCacheVersion> t = 
(IgniteBiTuple<Object, GridCacheVersion>)o;
-
-                            v = t.get1();
-                            ver = t.get2();
-                        }
-                        else
-                            v = o;
-
-                        KeyCacheObject cacheKey = cctx.toCacheKeyObject(k);
-
-                        vis.apply(cacheKey, v, ver);
-                    }
-                }, args);
-
-                thewEx = false;
-            }
-            catch (CacheLoaderException e) {
-                throw new IgniteCheckedException(e);
-            }
-            catch (Exception e) {
-                throw new IgniteCheckedException(new CacheLoaderException(e));
-            }
-            finally {
-                endSession(null, thewEx);
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Loaded all values from store.");
-
-            return true;
-        }
-
-        LT.warn(log, null, "Calling Cache.loadCache() method will have no 
effect, " +
-            "CacheConfiguration.getStore() is not defined for cache: " + 
cctx.namexx());
-
-        return false;
-    }
-
-    /**
-     * Puts key-value pair into storage.
-     *
-     * @param tx Cache transaction.
-     * @param key Key.
-     * @param val Value.
-     * @param ver Version.
-     * @return {@code true} If there is a persistent storage.
-     * @throws IgniteCheckedException If storage failed.
-     */
-    @SuppressWarnings("unchecked")
-    public boolean putToStore(@Nullable IgniteInternalTx tx, Object key, 
Object val, GridCacheVersion ver)
-        throws IgniteCheckedException {
-        if (store != null) {
-            // Never persist internal keys.
-            if (key instanceof GridCacheInternal)
-                return true;
-
-            if (convertPortable) {
-                key = cctx.unwrapPortableIfNeeded(key, false);
-                val = cctx.unwrapPortableIfNeeded(val, false);
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Storing value in cache store [key=" + key + ", 
val=" + val + ']');
-
-            initSession(tx);
-
-            boolean thewEx = true;
-
-            try {
-                store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) 
: val));
-
-                thewEx = false;
-            }
-            catch (ClassCastException e) {
-                handleClassCastException(e);
-            }
-            catch (CacheWriterException e) {
-                throw new IgniteCheckedException(e);
-            }
-            catch (Exception e) {
-                throw new IgniteCheckedException(new CacheWriterException(e));
-            }
-            finally {
-                endSession(tx, thewEx);
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Stored value in cache store [key=" + key + ", val=" 
+ val + ']');
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * Puts key-value pair into storage.
-     *
-     * @param tx Cache transaction.
-     * @param map Map.
-     * @return {@code True} if there is a persistent storage.
-     * @throws IgniteCheckedException If storage failed.
-     */
-    public boolean putAllToStore(@Nullable IgniteInternalTx tx,
-        Map<Object, IgniteBiTuple<Object, GridCacheVersion>> map)
-        throws IgniteCheckedException
-    {
-        if (F.isEmpty(map))
-            return true;
-
-        if (map.size() == 1) {
-            Map.Entry<Object, IgniteBiTuple<Object, GridCacheVersion>> e = 
map.entrySet().iterator().next();
-
-            return putToStore(tx, e.getKey(), e.getValue().get1(), 
e.getValue().get2());
-        }
-        else {
-            if (store != null) {
-                EntriesView entries = new EntriesView((Map)map);
-
-                if (log.isDebugEnabled())
-                    log.debug("Storing values in cache store [entries=" + 
entries + ']');
-
-                initSession(tx);
-
-                boolean thewEx = true;
-
-                try {
-                    store.writeAll(entries);
-
-                    thewEx = false;
-                }
-                catch (ClassCastException e) {
-                    handleClassCastException(e);
-                }
-                catch (Exception e) {
-                    if (!(e instanceof CacheWriterException))
-                        e = new CacheWriterException(e);
-
-                    if (!entries.isEmpty()) {
-                        List<Object> keys = new ArrayList<>(entries.size());
-
-                        for (Cache.Entry<?, ?> entry : entries)
-                            keys.add(entry.getKey());
-
-                        throw new CacheStorePartialUpdateException(keys, e);
-                    }
-
-                    throw new IgniteCheckedException(e);
-                }
-                finally {
-                    endSession(tx, thewEx);
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Stored value in cache store [entries=" + 
entries + ']');
-
-                return true;
-            }
-
-            return false;
-        }
-    }
-
-    /**
-     * @param tx Cache transaction.
-     * @param key Key.
-     * @return {@code True} if there is a persistent storage.
-     * @throws IgniteCheckedException If storage failed.
-     */
-    @SuppressWarnings("unchecked")
-    public boolean removeFromStore(@Nullable IgniteInternalTx tx, Object key) 
throws IgniteCheckedException {
-        if (store != null) {
-            // Never remove internal key from store as it is never persisted.
-            if (key instanceof GridCacheInternal)
-                return false;
-
-            if (convertPortable)
-                key = cctx.unwrapPortableIfNeeded(key, false);
-
-            if (log.isDebugEnabled())
-                log.debug("Removing value from cache store [key=" + key + ']');
-
-            initSession(tx);
-
-            boolean thewEx = true;
-
-            try {
-                store.delete(key);
-
-                thewEx = false;
-            }
-            catch (ClassCastException e) {
-                handleClassCastException(e);
-            }
-            catch (CacheWriterException e) {
-                throw new IgniteCheckedException(e);
-            }
-            catch (Exception e) {
-                throw new IgniteCheckedException(new CacheWriterException(e));
-            }
-            finally {
-                endSession(tx, thewEx);
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Removed value from cache store [key=" + key + ']');
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @param tx Cache transaction.
-     * @param keys Key.
-     * @return {@code True} if there is a persistent storage.
-     * @throws IgniteCheckedException If storage failed.
-     */
-    @SuppressWarnings("unchecked")
-    public boolean removeAllFromStore(@Nullable IgniteInternalTx tx, 
Collection<Object> keys)
-        throws IgniteCheckedException {
-        if (F.isEmpty(keys))
-            return true;
-
-        if (keys.size() == 1) {
-            Object key = keys.iterator().next();
-
-            return removeFromStore(tx, key);
-        }
-
-        if (store != null) {
-            Collection<Object> keys0 = convertPortable ? 
cctx.unwrapPortablesIfNeeded(keys, false) : keys;
-
-            if (log.isDebugEnabled())
-                log.debug("Removing values from cache store [keys=" + keys0 + 
']');
-
-            initSession(tx);
-
-            boolean thewEx = true;
-
-            try {
-                store.deleteAll(keys0);
-
-                thewEx = false;
-            }
-            catch (ClassCastException e) {
-                handleClassCastException(e);
-            }
-            catch (Exception e) {
-                if (!(e instanceof CacheWriterException))
-                    e = new CacheWriterException(e);
-
-                if (!keys0.isEmpty())
-                    throw new CacheStorePartialUpdateException(keys0, e);
-
-                throw new IgniteCheckedException(e);
-            }
-            finally {
-                endSession(tx, thewEx);
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Removed values from cache store [keys=" + keys0 + 
']');
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @return Store.
-     */
-    public CacheStore<Object, Object> store() {
-        return store;
-    }
-
-    /**
-     * @throws IgniteCheckedException If failed.
-     */
-    public void forceFlush() throws IgniteCheckedException {
-        if (store instanceof GridCacheWriteBehindStore)
-            ((GridCacheWriteBehindStore)store).forceFlush();
-    }
-
-    /**
-     * @param tx Transaction.
-     * @param commit Commit.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void txEnd(IgniteInternalTx tx, boolean commit) throws 
IgniteCheckedException {
-        assert store != null;
-
-        initSession(tx);
-
-        try {
-            store.sessionEnd(commit);
-        }
-        finally {
-            if (sesHolder != null) {
-                sesHolder.set(null);
-
-                tx.removeMeta(SES_ATTR);
-            }
-        }
-    }
-
-    /**
-     * @param e Class cast exception.
-     * @throws IgniteCheckedException Thrown exception.
-     */
-    private void handleClassCastException(ClassCastException e) throws 
IgniteCheckedException {
-        assert e != null;
-
-        if (e.getMessage() != null) {
-            throw new IgniteCheckedException("Cache store must work with 
portable objects if portables are " +
-                "enabled for cache [cacheName=" + cctx.namex() + ']', e);
-        }
-        else
-            throw e;
-    }
-
-    /**
-     * Clears session holder.
-     */
-    void endSession(@Nullable IgniteInternalTx tx, boolean threwEx) throws 
IgniteCheckedException {
-        try {
-            if (tx == null)
-                store.sessionEnd(threwEx);
-        }
-        catch (Exception e) {
-            if (!threwEx)
-                throw U.cast(e);
-        }
-        finally {
-            if (sesHolder != null)
-                sesHolder.set(null);
-        }
-    }
-
-    /**
-     * @param tx Current transaction.
-     */
-    void initSession(@Nullable IgniteInternalTx tx) {
-        if (sesHolder == null)
-            return;
-
-        assert sesHolder.get() == null;
-
-        SessionData ses;
-
-        if (tx != null) {
-            ses = tx.meta(SES_ATTR);
-
-            if (ses == null) {
-                ses = new SessionData(tx, cctx.name());
-
-                tx.addMeta(SES_ATTR, ses);
-            }
-            else
-                // Session cache name may change in cross-cache transaction.
-                ses.cacheName(cctx.name());
-        }
-        else
-            ses = new SessionData(null, cctx.name());
-
-        sesHolder.set(ses);
-    }
-
-    /**
-     *
-     */
-    private static class SessionData {
-        /** */
-        @GridToStringExclude
-        private final IgniteInternalTx tx;
-
-        /** */
-        private String cacheName;
-
-        /** */
-        @GridToStringInclude
-        private Map<Object, Object> props;
-
-        /**
-         * @param tx Current transaction.
-         * @param cacheName Cache name.
-         */
-        private SessionData(@Nullable IgniteInternalTx tx, @Nullable String 
cacheName) {
-            this.tx = tx;
-            this.cacheName = cacheName;
-        }
-
-        /**
-         * @return Transaction.
-         */
-        @Nullable private Transaction transaction() {
-            return tx != null ? tx.proxy() : null;
-        }
-
-        /**
-         * @return Properties.
-         */
-        private Map<Object, Object> properties() {
-            if (props == null)
-                props = new GridLeanMap<>();
-
-            return props;
-        }
-
-        /**
-         * @return Cache name.
-         */
-        private String cacheName() {
-            return cacheName;
-        }
-
-        /**
-         * @param cacheName Cache name.
-         */
-        private void cacheName(String cacheName) {
-            this.cacheName = cacheName;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(SessionData.class, this, "tx", CU.txString(tx));
-        }
-    }
-
-    /**
-     *
-     */
-    private static class ThreadLocalSession implements CacheStoreSession {
-        /** */
-        private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>();
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Transaction transaction() {
-            SessionData ses0 = sesHolder.get();
-
-            return ses0 != null ? ses0.transaction() : null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isWithinTransaction() {
-            return transaction() != null;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public <K1, V1> Map<K1, V1> properties() {
-            SessionData ses0 = sesHolder.get();
-
-            return ses0 != null ? (Map<K1, V1>)ses0.properties() : null;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public String cacheName() {
-            SessionData ses0 = sesHolder.get();
-
-            return ses0 != null ? ses0.cacheName() : null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ThreadLocalSession.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    @SuppressWarnings("unchecked")
-    private class EntriesView extends AbstractCollection<Cache.Entry<?, ?>> {
-        /** */
-        private final Map<?, IgniteBiTuple<?, GridCacheVersion>> map;
-
-        /** */
-        private Set<Object> rmvd;
-
-        /** */
-        private boolean cleared;
-
-        /**
-         * @param map Map.
-         */
-        private EntriesView(Map<?, IgniteBiTuple<?, GridCacheVersion>> map) {
-            assert map != null;
-
-            this.map = map;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int size() {
-            return cleared ? 0 : (map.size() - (rmvd != null ? rmvd.size() : 
0));
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isEmpty() {
-            return cleared || !iterator().hasNext();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean contains(Object o) {
-            if (cleared || !(o instanceof Cache.Entry))
-                return false;
-
-            Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o;
-
-            return map.containsKey(e.getKey());
-        }
-
-        /** {@inheritDoc} */
-        @NotNull @Override public Iterator<Cache.Entry<?, ?>> iterator() {
-            if (cleared)
-                return F.emptyIterator();
-
-            final Iterator<Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>>> 
it0 = (Iterator)map.entrySet().iterator();
-
-            return new Iterator<Cache.Entry<?, ?>>() {
-                /** */
-                private Cache.Entry<?, ?> cur;
-
-                /** */
-                private Cache.Entry<?, ?> next;
-
-                /**
-                 *
-                 */
-                {
-                    checkNext();
-                }
-
-                /**
-                 *
-                 */
-                private void checkNext() {
-                    while (it0.hasNext()) {
-                        Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>> e = 
it0.next();
-
-                        Object k = e.getKey();
-
-                        if (rmvd != null && rmvd.contains(k))
-                            continue;
-
-                        Object v = locStore ? e.getValue() : 
e.getValue().get1();
-
-                        if (convertPortable) {
-                            k = cctx.unwrapPortableIfNeeded(k, false);
-                            v = cctx.unwrapPortableIfNeeded(v, false);
-                        }
-
-                        next = new CacheEntryImpl<>(k, v);
-
-                        break;
-                    }
-                }
-
-                @Override public boolean hasNext() {
-                    return next != null;
-                }
-
-                @Override public Cache.Entry<?, ?> next() {
-                    if (next == null)
-                        throw new NoSuchElementException();
-
-                    cur = next;
-
-                    next = null;
-
-                    checkNext();
-
-                    return cur;
-                }
-
-                @Override public void remove() {
-                    if (cur == null)
-                        throw new IllegalStateException();
-
-                    addRemoved(cur);
-
-                    cur = null;
-                }
-            };
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean add(Cache.Entry<?, ?> entry) {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean addAll(Collection<? extends Cache.Entry<?, 
?>> col) {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean remove(Object o) {
-            if (cleared || !(o instanceof Cache.Entry))
-                return false;
-
-            Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o;
-
-            if (rmvd != null && rmvd.contains(e.getKey()))
-                return false;
-
-            if (mapContains(e)) {
-                addRemoved(e);
-
-                return true;
-            }
-
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean containsAll(Collection<?> col) {
-            if (cleared)
-                return false;
-
-            for (Object o : col) {
-                if (contains(o))
-                    return false;
-            }
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean removeAll(Collection<?> col) {
-            if (cleared)
-                return false;
-
-            boolean modified = false;
-
-            for (Object o : col) {
-                if (remove(o))
-                    modified = true;
-            }
-
-            return modified;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean retainAll(Collection<?> col) {
-            if (cleared)
-                return false;
-
-            boolean modified = false;
-
-            for (Cache.Entry<?, ?> e : this) {
-                if (!col.contains(e)) {
-                    addRemoved(e);
-
-                    modified = true;
-                }
-            }
-
-            return modified;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void clear() {
-            cleared = true;
-        }
-
-        /**
-         * @param e Entry.
-         */
-        private void addRemoved(Cache.Entry<?, ?> e) {
-            if (rmvd == null)
-                rmvd = new HashSet<>();
-
-            rmvd.add(e.getKey());
-        }
-
-        /**
-         * @param e Entry.
-         * @return {@code True} if original map contains entry.
-         */
-        private boolean mapContains(Cache.Entry<?, ?> e) {
-            return map.containsKey(e.getKey());
-        }
-
-        /** {@inheritDoc} */
-        public String toString() {
-            Iterator<Cache.Entry<?, ?>> it = iterator();
-
-            if (!it.hasNext())
-                return "[]";
-
-            SB sb = new SB("[");
-
-            while (true) {
-                Cache.Entry<?, ?> e = it.next();
-
-                sb.a(e.toString());
-
-                if (!it.hasNext())
-                    return sb.a(']').toString();
-
-                sb.a(", ");
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java
deleted file mode 100644
index c26c34e..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java
+++ /dev/null
@@ -1,1015 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.lifecycle.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
-import org.jsr166.*;
-
-import javax.cache.integration.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-import static javax.cache.Cache.*;
-
-/**
- * Internal wrapper for a {@link CacheStore} that enables write-behind logic.
- * <p/>
- * The general purpose of this approach is to reduce cache store load under 
high
- * store update rate. The idea is to cache all write and remove operations in 
a pending
- * map and delegate these changes to the underlying store either after timeout 
or
- * if size of a pending map exceeded some pre-configured value. Another 
performance gain
- * is achieved due to combining a group of similar operations to a single 
batch update.
- * <p/>
- * The essential flush size for the write-behind cache should be at least the 
estimated
- * count of simultaneously written keys. In case of significantly smaller 
value there would
- * be triggered a lot of flush events that will result in a high cache store 
load.
- * <p/>
- * Since write operations to the cache store are deferred, transaction support 
is lost; no
- * transaction objects are passed to the underlying store.
- */
-public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, 
LifecycleAware {
-    /** Default write cache initial capacity. */
-    public static final int DFLT_INITIAL_CAPACITY = 1024;
-
-    /** Overflow ratio for critical cache size calculation. */
-    public static final float CACHE_OVERFLOW_RATIO = 1.5f;
-
-    /** Default concurrency level of write cache. */
-    public static final int DFLT_CONCUR_LVL = 64;
-
-    /** Write cache initial capacity. */
-    private int initCap = DFLT_INITIAL_CAPACITY;
-
-    /** Concurrency level for write cache access. */
-    private int concurLvl = DFLT_CONCUR_LVL;
-
-    /** When cache size exceeds this value eldest entry will be stored to the 
underlying store. */
-    private int cacheMaxSize = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_SIZE;
-
-    /** Critical cache size. If cache size exceeds this value, data flush 
performed synchronously. */
-    private int cacheCriticalSize;
-
-    /** Count of worker threads performing underlying store updates. */
-    private int flushThreadCnt = 
CacheConfiguration.DFLT_WRITE_FROM_BEHIND_FLUSH_THREAD_CNT;
-
-    /** Cache flush frequency. All pending operations will be performed in not 
less then this value ms. */
-    private long cacheFlushFreq = 
CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_FREQUENCY;
-
-    /** Maximum batch size for put and remove operations */
-    private int batchSize = CacheConfiguration.DFLT_WRITE_BEHIND_BATCH_SIZE;
-
-    /** Grid name. */
-    private String gridName;
-
-    /** Cache name. */
-    private String cacheName;
-
-    /** Underlying store. */
-    private CacheStore<K, V> store;
-
-    /** Write cache. */
-    private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache;
-
-    /** Flusher threads. */
-    private GridWorker[] flushThreads;
-
-    /** Atomic flag indicating store shutdown. */
-    private AtomicBoolean stopping = new AtomicBoolean(true);
-
-    /** Flush lock. */
-    private Lock flushLock = new ReentrantLock();
-
-    /** Condition to determine records available for flush. */
-    private Condition canFlush = flushLock.newCondition();
-
-    /** Variable for counting total cache overflows. */
-    private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger();
-
-    /** Variable contains current number of overflow events. */
-    private AtomicInteger cacheOverflowCntr = new AtomicInteger();
-
-    /** Variable for counting key-value pairs that are in {@link 
ValueStatus#RETRY} state. */
-    private AtomicInteger retryEntriesCnt = new AtomicInteger();
-
-    /** Log. */
-    private IgniteLogger log;
-
-    /** Store manager. */
-    private GridCacheStoreManager storeMgr;
-
-    /**
-     * Creates a write-behind cache store for the given store.
-     *
-     * @param storeMgr Store manager.
-     * @param gridName Grid name.
-     * @param cacheName Cache name.
-     * @param log Grid logger.
-     * @param store {@code GridCacheStore} that need to be wrapped.
-     */
-    public GridCacheWriteBehindStore(
-        GridCacheStoreManager storeMgr,
-        String gridName,
-        String cacheName,
-        IgniteLogger log,
-        CacheStore<K, V> store) {
-        this.storeMgr = storeMgr;
-        this.gridName = gridName;
-        this.cacheName = cacheName;
-        this.log = log;
-        this.store = store;
-    }
-
-    /**
-     * Sets initial capacity for the write cache.
-     *
-     * @param initCap Initial capacity.
-     */
-    public void setInitialCapacity(int initCap) {
-        this.initCap = initCap;
-    }
-
-    /**
-     * Sets concurrency level for the write cache. Concurrency level is 
expected count of concurrent threads
-     * attempting to update cache.
-     *
-     * @param concurLvl Concurrency level.
-     */
-    public void setConcurrencyLevel(int concurLvl) {
-        this.concurLvl = concurLvl;
-    }
-
-    /**
-     * Sets the maximum size of the write cache. When the count of unique keys 
in write cache exceeds this value,
-     * the eldest entry in the cache is immediately scheduled for write to the 
underlying store.
-     *
-     * @param cacheMaxSize Max cache size.
-     */
-    public void setFlushSize(int cacheMaxSize) {
-        this.cacheMaxSize = cacheMaxSize;
-    }
-
-    /**
-     * Gets the maximum size of the write-behind buffer. When the count of 
unique keys
-     * in write buffer exceeds this value, the buffer is scheduled for write 
to the underlying store.
-     * <p/>
-     * If this value is {@code 0}, then flush is performed only on 
time-elapsing basis. However,
-     * when this value is {@code 0}, the cache critical size is set to
-     * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE}
-     *
-     * @return Buffer size that triggers flush procedure.
-     */
-    public int getWriteBehindFlushSize() {
-        return cacheMaxSize;
-    }
-
-    /**
-     * Sets the number of threads that will perform store update operations.
-     *
-     * @param flushThreadCnt Count of worker threads.
-     */
-    public void setFlushThreadCount(int flushThreadCnt) {
-        this.flushThreadCnt = flushThreadCnt;
-    }
-
-    /**
-     * Gets the number of flush threads that will perform store update 
operations.
-     *
-     * @return Count of worker threads.
-     */
-    public int getWriteBehindFlushThreadCount() {
-        return flushThreadCnt;
-    }
-
-    /**
-     * Sets the cache flush frequency. All pending operations on the 
underlying store will be performed
-     * within time interval not less then this value.
-     *
-     * @param cacheFlushFreq Time interval value in milliseconds.
-     */
-    public void setFlushFrequency(long cacheFlushFreq) {
-        this.cacheFlushFreq = cacheFlushFreq;
-    }
-
-    /**
-     * Gets the cache flush frequency. All pending operations on the 
underlying store will be performed
-     * within time interval not less then this value.
-     * <p/>
-     * If this value is {@code 0}, then flush is performed only when buffer 
size exceeds flush size.
-     *
-     * @return Flush frequency in milliseconds.
-     */
-    public long getWriteBehindFlushFrequency() {
-        return cacheFlushFreq;
-    }
-
-    /**
-     * Sets the maximum count of similar operations that can be grouped to a 
single batch.
-     *
-     * @param batchSize Maximum count of batch.
-     */
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
-
-    /**
-     * Gets the maximum count of similar (put or remove) operations that can 
be grouped to a single batch.
-     *
-     * @return Maximum size of batch.
-     */
-    public int getWriteBehindStoreBatchSize() {
-        return batchSize;
-    }
-
-    /**
-     * Gets count of entries that were processed by the write-behind store and 
have not been
-     * flushed to the underlying store yet.
-     *
-     * @return Total count of entries in cache store internal buffer.
-     */
-    public int getWriteBehindBufferSize() {
-        return writeCache.sizex();
-    }
-
-    /**
-     * @return Underlying store.
-     */
-    public CacheStore<K, V> store() {
-        return store;
-    }
-
-    /**
-     * Performs all the initialization logic for write-behind cache store.
-     * This class must not be used until this method returns.
-     */
-    @Override public void start() {
-        assert cacheFlushFreq != 0 || cacheMaxSize != 0;
-
-        if (stopping.compareAndSet(true, false)) {
-            if (log.isDebugEnabled())
-                log.debug("Starting write-behind store for cache '" + 
cacheName + '\'');
-
-            cacheCriticalSize = (int)(cacheMaxSize * CACHE_OVERFLOW_RATIO);
-
-            if (cacheCriticalSize == 0)
-                cacheCriticalSize = 
CacheConfiguration.DFLT_WRITE_BEHIND_CRITICAL_SIZE;
-
-            flushThreads = new GridWorker[flushThreadCnt];
-
-            writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, 
concurLvl);
-
-            for (int i = 0; i < flushThreads.length; i++) {
-                flushThreads[i] = new Flusher(gridName, "flusher-" + i, log);
-
-                new IgniteThread(flushThreads[i]).start();
-            }
-        }
-    }
-
-    /**
-     * Gets count of write buffer overflow events since initialization. Each 
overflow event causes
-     * the ongoing flush operation to be performed synchronously.
-     *
-     * @return Count of cache overflow events since start.
-     */
-    public int getWriteBehindTotalCriticalOverflowCount() {
-        return cacheTotalOverflowCntr.get();
-    }
-
-    /**
-     * Gets count of write buffer overflow events in progress at the moment. 
Each overflow event causes
-     * the ongoing flush operation to be performed synchronously.
-     *
-     * @return Count of cache overflow events since start.
-     */
-    public int getWriteBehindCriticalOverflowCount() {
-        return cacheOverflowCntr.get();
-    }
-
-    /**
-     * Gets count of cache entries that are in a store-retry state. An entry 
is assigned a store-retry state
-     * when underlying store failed due some reason and cache has enough space 
to retain this entry till
-     * the next try.
-     *
-     * @return Count of entries in store-retry state.
-     */
-    public int getWriteBehindErrorRetryCount() {
-        return retryEntriesCnt.get();
-    }
-
-    /**
-     * Performs shutdown logic for store. No put, get and remove requests will 
be processed after
-     * this method is called.
-     */
-    @Override public void stop() {
-        if (stopping.compareAndSet(false, true)) {
-            if (log.isDebugEnabled())
-                log.debug("Stopping write-behind store for cache '" + 
cacheName + '\'');
-
-            wakeUp();
-
-            boolean graceful = true;
-
-            for (GridWorker worker : flushThreads)
-                graceful &= U.join(worker, log);
-
-            if (!graceful)
-                log.warning("Shutdown was aborted");
-        }
-    }
-
-    /**
-     * Forces all entries collected to be flushed to the underlying store.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void forceFlush() throws IgniteCheckedException {
-        wakeUp();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable 
Object... args) {
-        store.loadCache(clo, args);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<K, V> loadAll(Iterable<? extends K> keys) {
-        if (log.isDebugEnabled())
-            log.debug("Store load all [keys=" + keys + ']');
-
-        Map<K, V> loaded = new HashMap<>();
-
-        Collection<K> remaining = new LinkedList<>();
-
-        for (K key : keys) {
-            StatefulValue<K, V> val = writeCache.get(key);
-
-            if (val != null) {
-                val.readLock().lock();
-
-                try {
-                    if (val.operation() == StoreOperation.PUT)
-                        loaded.put(key, val.entry().getValue());
-                    else
-                        assert val.operation() == StoreOperation.RMV : 
val.operation();
-                }
-                finally {
-                    val.readLock().unlock();
-                }
-            }
-            else
-                remaining.add(key);
-        }
-
-        // For items that were not found in queue.
-        if (!remaining.isEmpty()) {
-            Map<K, V> loaded0 = store.loadAll(remaining);
-
-            if (loaded0 != null)
-                loaded.putAll(loaded0);
-        }
-
-        return loaded;
-    }
-
-    /** {@inheritDoc} */
-    @Override public V load(K key) {
-        if (log.isDebugEnabled())
-            log.debug("Store load [key=" + key + ']');
-
-        StatefulValue<K, V> val = writeCache.get(key);
-
-        if (val != null) {
-            val.readLock().lock();
-
-            try {
-                switch (val.operation()) {
-                    case PUT:
-                        return val.entry().getValue();
-
-                    case RMV:
-                        return null;
-
-                    default:
-                        assert false : "Unexpected operation: " + val.status();
-                }
-            }
-            finally {
-                val.readLock().unlock();
-            }
-        }
-
-        return store.load(key);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeAll(Collection<Entry<? extends K, ? extends V>> 
entries) {
-        for (Entry<? extends K, ? extends V> e : entries)
-            write(e);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(Entry<? extends K, ? extends V> entry) {
-        try {
-            if (log.isDebugEnabled())
-                log.debug("Store put [key=" + entry.getKey() + ", val=" + 
entry.getValue() + ']');
-
-            updateCache(entry.getKey(), entry, StoreOperation.PUT);
-        }
-        catch (IgniteInterruptedCheckedException e) {
-            throw new CacheWriterException(U.convertExceptionNoWrap(e));
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void deleteAll(Collection<?> keys) {
-        for (Object key : keys)
-            delete(key);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void delete(Object key) {
-        try {
-            if (log.isDebugEnabled())
-                log.debug("Store remove [key=" + key + ']');
-
-            updateCache((K)key, null, StoreOperation.RMV);
-        }
-        catch (IgniteInterruptedCheckedException e) {
-            throw new CacheWriterException(U.convertExceptionNoWrap(e));
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sessionEnd(boolean commit) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheWriteBehindStore.class, this);
-    }
-
-    /**
-     * Performs flush-consistent cache update for the given key.
-     *
-     * @param key Key for which update is performed.
-     * @param val New value, may be null for remove operation.
-     * @param operation Updated value status
-     * @throws IgniteInterruptedCheckedException If interrupted while waiting 
for value to be flushed.
-     */
-    private void updateCache(K key,
-        @Nullable Entry<? extends K, ? extends V> val,
-        StoreOperation operation)
-        throws IgniteInterruptedCheckedException {
-        StatefulValue<K, V> newVal = new StatefulValue<>(val, operation);
-
-        StatefulValue<K, V> prev;
-
-        while ((prev = writeCache.putIfAbsent(key, newVal)) != null) {
-            prev.writeLock().lock();
-
-            try {
-                if (prev.status() == ValueStatus.PENDING) {
-                    // Flush process in progress, try again.
-                    prev.waitForFlush();
-
-                    continue;
-                }
-                else if (prev.status() == ValueStatus.FLUSHED)
-                    // This entry was deleted from map before we acquired the 
lock.
-                    continue;
-                else if (prev.status() == ValueStatus.RETRY)
-                    // New value has come, old value is no longer in RETRY 
state,
-                    retryEntriesCnt.decrementAndGet();
-
-                assert prev.status() == ValueStatus.NEW || prev.status() == 
ValueStatus.RETRY;
-
-                prev.update(val, operation, ValueStatus.NEW);
-
-                break;
-            }
-            finally {
-                prev.writeLock().unlock();
-            }
-        }
-
-        // Now check the map size
-        if (writeCache.sizex() > cacheCriticalSize)
-            // Perform single store update in the same thread.
-            flushSingleValue();
-        else if (cacheMaxSize > 0 && writeCache.sizex() > cacheMaxSize)
-            wakeUp();
-    }
-
-    /**
-     * Flushes one upcoming value to the underlying store. Called from
-     * {@link #updateCache(Object, Entry, StoreOperation)} method in case when 
current map size exceeds
-     * critical size.
-     */
-    private void flushSingleValue() {
-        cacheOverflowCntr.incrementAndGet();
-
-        try {
-            Map<K, StatefulValue<K, V>> batch = null;
-
-            for (Map.Entry<K, StatefulValue<K, V>> e : writeCache.entrySet()) {
-                StatefulValue<K, V> val = e.getValue();
-
-                val.writeLock().lock();
-
-                try {
-                    ValueStatus status = val.status();
-
-                    if (acquired(status))
-                        // Another thread is helping us, continue to the next 
entry.
-                        continue;
-
-                    if (val.status() == ValueStatus.RETRY)
-                        retryEntriesCnt.decrementAndGet();
-
-                    assert retryEntriesCnt.get() >= 0;
-
-                    val.status(ValueStatus.PENDING);
-
-                    batch = Collections.singletonMap(e.getKey(), val);
-                }
-                finally {
-                    val.writeLock().unlock();
-                }
-
-                if (!batch.isEmpty()) {
-                    applyBatch(batch, false);
-
-                    cacheTotalOverflowCntr.incrementAndGet();
-
-                    return;
-                }
-            }
-        }
-        finally {
-            cacheOverflowCntr.decrementAndGet();
-        }
-    }
-
-    /**
-     * Performs batch operation on underlying store.
-     *
-     * @param valMap Batch map.
-     * @param initSes {@code True} if need to initialize session.
-     */
-    private void applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean 
initSes) {
-        assert valMap.size() <= batchSize;
-
-        StoreOperation operation = null;
-
-        // Construct a map for underlying store
-        Map<K, Entry<? extends K, ? extends V>> batch = 
U.newLinkedHashMap(valMap.size());
-
-        for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
-            if (operation == null)
-                operation = e.getValue().operation();
-
-            assert operation == e.getValue().operation();
-
-            assert e.getValue().status() == ValueStatus.PENDING;
-
-            batch.put(e.getKey(), e.getValue().entry());
-        }
-
-        if (updateStore(operation, batch, initSes)) {
-            for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
-                StatefulValue<K, V> val = e.getValue();
-
-                val.writeLock().lock();
-
-                try {
-                    val.status(ValueStatus.FLUSHED);
-
-                    StatefulValue<K, V> prev = writeCache.remove(e.getKey());
-
-                    // Additional check to ensure consistency.
-                    assert prev == val : "Map value for key " + e.getKey() + " 
was updated during flush";
-
-                    val.signalFlushed();
-                }
-                finally {
-                    val.writeLock().unlock();
-                }
-            }
-        }
-        else {
-            // Exception occurred, we must set RETRY status
-            for (StatefulValue<K, V> val : valMap.values()) {
-                val.writeLock().lock();
-
-                try {
-                    val.status(ValueStatus.RETRY);
-
-                    retryEntriesCnt.incrementAndGet();
-
-                    val.signalFlushed();
-                }
-                finally {
-                    val.writeLock().unlock();
-                }
-            }
-        }
-    }
-
-    /**
-     * Tries to update store with the given values and returns {@code true} in 
case of success.
-     *
-     * <p/> If any exception in underlying store is occurred, this method 
checks the map size.
-     * If map size exceeds some critical value, then it returns {@code true} 
and this value will
-     * be lost. If map size does not exceed critical value, it will return 
false and value will
-     * be retained in write cache.
-     *
-     * @param operation Status indicating operation that should be performed.
-     * @param vals Key-Value map.
-     * @param initSes {@code True} if need to initialize session.
-     * @return {@code true} if value may be deleted from the write cache,
-     *         {@code false} otherwise
-     */
-    private boolean updateStore(StoreOperation operation,
-        Map<K, Entry<? extends K, ? extends  V>> vals,
-        boolean initSes) {
-
-        if (initSes && storeMgr != null)
-            storeMgr.initSession(null);
-
-        try {
-            boolean threwEx = true;
-
-            try {
-                switch (operation) {
-                    case PUT:
-                        store.writeAll(vals.values());
-
-                        break;
-
-                    case RMV:
-                        store.deleteAll(vals.keySet());
-
-                        break;
-
-                    default:
-                        assert false : "Unexpected operation: " + operation;
-                }
-
-                threwEx = false;
-
-                return true;
-            }
-            finally {
-                if (initSes && storeMgr != null)
-                    storeMgr.endSession(null, threwEx);
-            }
-        }
-        catch (Exception e) {
-            LT.warn(log, e, "Unable to update underlying store: " + store);
-
-            if (writeCache.sizex() > cacheCriticalSize || stopping.get()) {
-                for (Map.Entry<K, Entry<? extends K, ? extends  V>> entry : 
vals.entrySet()) {
-                    Object val = entry.getValue() != null ? 
entry.getValue().getValue() : null;
-
-                    log.warning("Failed to update store (value will be lost as 
current buffer size is greater " +
-                        "than 'cacheCriticalSize' or node has been stopped 
before store was repaired) [key=" +
-                        entry.getKey() + ", val=" + val + ", op=" + operation 
+ "]");
-                }
-
-                return true;
-            }
-
-            return false;
-        }
-    }
-
-    /**
-     * Wakes up flushing threads if map size exceeded maximum value or in case 
of shutdown.
-     */
-    private void wakeUp() {
-        flushLock.lock();
-
-        try {
-            canFlush.signalAll();
-        }
-        finally {
-            flushLock.unlock();
-        }
-    }
-
-    /**
-     * Thread that performs time-based flushing of written values to the 
underlying storage.
-     */
-    private class Flusher extends GridWorker {
-        /** {@inheritDoc */
-        protected Flusher(String gridName, String name, IgniteLogger log) {
-            super(gridName, name, log);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
-            while (!stopping.get() || writeCache.sizex() > 0) {
-                awaitOperationsAvailable();
-
-                flushCache(writeCache.entrySet().iterator());
-            }
-        }
-
-        /**
-         * This method awaits until enough elements in map are available or 
given timeout is over.
-         *
-         * @throws InterruptedException If awaiting was interrupted.
-         */
-        private void awaitOperationsAvailable() throws InterruptedException {
-            flushLock.lock();
-
-            try {
-                do {
-                    if (writeCache.sizex() <= cacheMaxSize || cacheMaxSize == 
0) {
-                        if (cacheFlushFreq > 0)
-                            canFlush.await(cacheFlushFreq, 
TimeUnit.MILLISECONDS);
-                        else
-                            canFlush.await();
-                    }
-                }
-                while (writeCache.sizex() == 0 && !stopping.get());
-            }
-            finally {
-                flushLock.unlock();
-            }
-        }
-
-        /**
-         * Removes values from the write cache and performs corresponding 
operation
-         * on the underlying store.
-         *
-         * @param it Iterator for write cache.
-         */
-        private void flushCache(Iterator<Map.Entry<K,StatefulValue<K, V>>> it) 
{
-            StoreOperation operation = null;
-
-            Map<K, StatefulValue<K, V>> batch = null;
-            Map<K, StatefulValue<K, V>> pending  = 
U.newLinkedHashMap(batchSize);
-
-            while (it.hasNext()) {
-                Map.Entry<K, StatefulValue<K, V>> e = it.next();
-
-                StatefulValue<K, V> val = e.getValue();
-
-                val.writeLock().lock();
-
-                try {
-                    ValueStatus status = val.status();
-
-                    if (acquired(status))
-                        // Another thread is helping us, continue to the next 
entry.
-                        continue;
-
-                    if (status == ValueStatus.RETRY)
-                        retryEntriesCnt.decrementAndGet();
-
-                    assert retryEntriesCnt.get() >= 0;
-
-                    val.status(ValueStatus.PENDING);
-
-                    // We scan for the next operation and apply batch on 
operation change. Null means new batch.
-                    if (operation == null)
-                        operation = val.operation();
-
-                    if (operation != val.operation()) {
-                        // Operation is changed, so we need to perform a batch.
-                        batch = pending;
-                        pending = U.newLinkedHashMap(batchSize);
-
-                        operation = val.operation();
-
-                        pending.put(e.getKey(), val);
-                    }
-                    else
-                        pending.put(e.getKey(), val);
-
-                    if (pending.size() == batchSize) {
-                        batch = pending;
-                        pending = U.newLinkedHashMap(batchSize);
-
-                        operation = null;
-                    }
-                }
-                finally {
-                    val.writeLock().unlock();
-                }
-
-                if (batch != null && !batch.isEmpty()) {
-                    applyBatch(batch, true);
-                    batch = null;
-                }
-            }
-
-            // Process the remainder.
-            if (!pending.isEmpty())
-                applyBatch(pending, true);
-        }
-    }
-
-    /**
-     * For test purposes only.
-     *
-     * @return Write cache for the underlying store operations.
-     */
-    Map<K, StatefulValue<K, V>> writeCache() {
-        return writeCache;
-    }
-
-    /**
-     * Enumeration that represents possible operations on the underlying store.
-     */
-    private enum StoreOperation {
-        /** Put key-value pair to the underlying store. */
-        PUT,
-
-        /** Remove key from the underlying store. */
-        RMV
-    }
-
-    /**
-     * Enumeration that represents possible states of value in the map.
-     */
-    private enum ValueStatus {
-        /** Value is scheduled for write or delete from the underlying cache 
but has not been captured by flusher. */
-        NEW,
-
-        /** Value is captured by flusher and store operation is performed at 
the moment. */
-        PENDING,
-
-        /** Store operation has failed and it will be re-tried at the next 
flush. */
-        RETRY,
-
-        /** Store operation succeeded and this value will be removed by 
flusher. */
-        FLUSHED,
-    }
-
-    /**
-     * Checks if given status indicates pending or complete flush operation.
-     *
-     * @param status Status to check.
-     * @return {@code true} if status indicates any pending or complete store 
update operation.
-     */
-    private boolean acquired(ValueStatus status) {
-        return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED;
-    }
-
-    /**
-     * A state-value-operation trio.
-     *
-     * @param <V> Value type.
-     */
-    private static class StatefulValue<K, V> extends ReentrantReadWriteLock {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Value. */
-        @GridToStringInclude
-        private Entry<? extends K, ? extends V> val;
-
-        /** Store operation. */
-        private StoreOperation storeOperation;
-
-        /** Value status. */
-        private ValueStatus valStatus;
-
-        /** Condition to wait for flush event */
-        private Condition flushCond = writeLock().newCondition();
-
-        /**
-         * Creates a state-value pair with {@link ValueStatus#NEW} status.
-         *
-         * @param val Value.
-         * @param storeOperation Store operation.
-         */
-        private StatefulValue(Entry<? extends K, ? extends V> val, 
StoreOperation storeOperation) {
-            assert storeOperation == StoreOperation.PUT || storeOperation == 
StoreOperation.RMV;
-
-            this.val = val;
-            this.storeOperation = storeOperation;
-            valStatus = ValueStatus.NEW;
-        }
-
-        /**
-         * @return Stored value.
-         */
-        private Entry<? extends K, ? extends V> entry() {
-            return val;
-        }
-
-        /**
-         * @return Store operation.
-         */
-        private StoreOperation operation() {
-            return storeOperation;
-        }
-
-        /**
-         * @return Value status
-         */
-        private ValueStatus status() {
-            return valStatus;
-        }
-
-        /**
-         * Updates value status.
-         *
-         * @param valStatus Value status.
-         */
-        private void status(ValueStatus valStatus) {
-            this.valStatus = valStatus;
-        }
-
-        /**
-         * Updates both value and value status.
-         *
-         * @param val Value.
-         * @param storeOperation Store operation.
-         * @param valStatus Value status.
-         */
-        private void update(@Nullable Entry<? extends K, ? extends V> val,
-            StoreOperation storeOperation,
-            ValueStatus valStatus) {
-            this.val = val;
-            this.storeOperation = storeOperation;
-            this.valStatus = valStatus;
-        }
-
-        /**
-         * Awaits a signal on flush condition
-         *
-         * @throws IgniteInterruptedCheckedException If thread was interrupted.
-         */
-        private void waitForFlush() throws IgniteInterruptedCheckedException {
-            U.await(flushCond);
-        }
-
-        /**
-         * Signals flush condition.
-         */
-        @SuppressWarnings({"SignalWithoutCorrespondingAwait"})
-        private void signalFlushed() {
-            flushCond.signalAll();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (!(o instanceof StatefulValue))
-                return false;
-
-            StatefulValue other = (StatefulValue)o;
-
-            return F.eq(val, other.val) && F.eq(valStatus, other.valStatus);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = val != null ? val.hashCode() : 0;
-
-            res = 31 * res + valStatus.hashCode();
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(StatefulValue.class, this);
-        }
-    }
-}

Reply via email to