# ignite-42

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/25f7984a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/25f7984a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/25f7984a

Branch: refs/heads/ignite-32
Commit: 25f7984a2a581d94fac6bc282cc9f8fb8097c3b7
Parents: ee7bb3f
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Jan 21 10:28:40 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Jan 21 12:36:45 2015 +0300

----------------------------------------------------------------------
 .../integration/GridClientAbstractSelfTest.java |   1 +
 .../apache/ignite/cache/CacheConfiguration.java |  18 +
 .../apache/ignite/cache/store/CacheStore.java   |  12 +
 .../processors/cache/GridCacheAttributes.java   |  44 +-
 .../processors/cache/GridCacheContext.java      |   7 +
 .../processors/cache/GridCacheMapEntry.java     |   4 +-
 .../processors/cache/GridCacheProcessor.java    |  17 +-
 .../processors/cache/GridCacheStoreManager.java |  78 ++--
 .../dht/atomic/GridDhtAtomicCache.java          |   6 +-
 .../local/atomic/GridLocalAtomicCache.java      |   4 +-
 .../transactions/IgniteTxLocalAdapter.java      |   5 +-
 ...CacheJdbcBlobStoreMultithreadedSelfTest.java |   1 +
 .../cache/IgniteCacheAbstractTest.java          |   1 +
 .../IgniteCacheAtomicStoreSessionTest.java      |  55 +++
 .../IgniteCacheLoaderWriterAbstractTest.java    | 157 +++----
 .../IgniteCacheStoreSessionAbstractTest.java    | 303 +++++++++++++
 .../IgniteCacheStoreSessionTest.java            | 435 -------------------
 .../IgniteCacheTxStoreSessionTest.java          | 285 ++++++++++++
 .../GridCacheLoadOnlyStoreAdapterSelfTest.java  |   1 +
 .../cache/GridCacheAbstractSelfTest.java        |   1 +
 .../cache/GridCacheBasicStoreAbstractTest.java  |   1 +
 ...acheBasicStoreMultithreadedAbstractTest.java |   1 +
 ...idCacheConfigurationConsistencySelfTest.java |   5 +
 ...idCacheGetAndTransformStoreAbstractTest.java |   1 +
 .../GridCacheGroupLockAbstractSelfTest.java     |   1 +
 .../cache/GridCacheLifecycleAwareSelfTest.java  |   1 +
 .../cache/GridCachePartitionedWritesTest.java   |   1 +
 .../cache/GridCacheReloadSelfTest.java          |   1 +
 .../cache/GridCacheStorePutxSelfTest.java       |   1 +
 .../cache/GridCacheSwapReloadSelfTest.java      |   2 +
 .../GridCacheWriteBehindStoreAbstractTest.java  |   1 +
 ...BehindStorePartitionedMultiNodeSelfTest.java |   1 +
 .../IgniteTxExceptionAbstractSelfTest.java      |   1 +
 .../IgniteTxStoreExceptionAbstractSelfTest.java |   1 +
 ...actQueueFailoverDataConsistencySelfTest.java |   1 +
 ...CacheAtomicReferenceApiSelfAbstractTest.java |   2 +
 .../GridCacheClientModesAbstractSelfTest.java   |   2 +
 ...chePartitionedReloadAllAbstractSelfTest.java |   1 +
 .../IgniteTxPreloadAbstractTest.java            |   6 +-
 ...heAbstractTransformWriteThroughSelfTest.java |   1 +
 .../dht/GridCacheColocatedDebugTest.java        |   1 +
 .../near/GridCacheGetStoreErrorSelfTest.java    |   1 +
 .../near/GridCacheNearOneNodeSelfTest.java      |   1 +
 .../GridCacheNearPartitionedClearSelfTest.java  |   1 +
 ...ePartitionedBasicStoreMultiNodeSelfTest.java |   1 +
 .../GridCachePartitionedLoadCacheSelfTest.java  |   1 +
 .../GridCachePartitionedStorePutSelfTest.java   |   1 +
 .../near/GridPartitionedBackupLoadSelfTest.java |   1 +
 .../GridCacheBatchEvictUnswapSelfTest.java      |   1 +
 .../GridCacheEmptyEntriesAbstractSelfTest.java  |   1 +
 .../GridCacheEvictionTouchSelfTest.java         |   1 +
 .../local/GridCacheLocalLoadAllSelfTest.java    |   1 +
 ...ridCacheContinuousQueryAbstractSelfTest.java |   1 +
 .../GridCacheWriteBehindStoreLoadTest.java      |   1 +
 .../loadtests/hashmap/GridCacheTestContext.java |   8 +-
 .../swap/GridSwapEvictAllBenchmark.java         |   1 +
 .../bamboo/GridDataGridTestSuite.java           |   5 +-
 .../cache/GridCacheAbstractQuerySelfTest.java   |   1 +
 .../cache/GridCacheQueryLoadSelfTest.java       |   1 +
 59 files changed, 921 insertions(+), 576 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
 
b/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
index 6eb6184..9cc1fc2 100644
--- 
a/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
+++ 
b/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
@@ -266,6 +266,7 @@ public abstract class GridClientAbstractSelfTest extends 
GridCommonAbstractTest
         cfg.setCacheStoreFactory(new 
FactoryBuilder.SingletonFactory(cacheStore));
         cfg.setWriteThrough(true);
         cfg.setReadThrough(true);
+        cfg.setLoadPreviousValue(true);
 
         cfg.setSwapEnabled(true);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/apache/ignite/cache/CacheConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/CacheConfiguration.java 
b/modules/core/src/main/java/org/apache/ignite/cache/CacheConfiguration.java
index 54c3444..5a91df8 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheConfiguration.java
@@ -239,6 +239,9 @@ public class CacheConfiguration extends 
MutableConfiguration {
     /** */
     private Factory storeFactory;
 
+    /** */
+    private boolean loadPrevVal;
+
     /** Node group resolver. */
     private GridCacheAffinityFunction aff;
 
@@ -383,6 +386,7 @@ public class CacheConfiguration extends 
MutableConfiguration {
         isWriteThrough = cc.isWriteThrough();
         keepPortableInStore = cc.isKeepPortableInStore();
         listenerConfigurations = cc.listenerConfigurations;
+        loadPrevVal = cc.isLoadPreviousValue();
         offHeapMaxMem = cc.getOffHeapMaxMemory();
         maxConcurrentAsyncOps = cc.getMaxConcurrentAsyncOperations();
         maxQryIterCnt = cc.getMaximumQueryIteratorCount();
@@ -790,6 +794,20 @@ public class CacheConfiguration extends 
MutableConfiguration {
     }
 
     /**
+     * @return
+     */
+    public boolean isLoadPreviousValue() {
+        return loadPrevVal;
+    }
+
+    /**
+     * @param loadPrevVal
+     */
+    public void setLoadPreviousValue(boolean loadPrevVal) {
+        this.loadPrevVal = loadPrevVal;
+    }
+
+    /**
      * Gets factory for underlying persistent storage for read-through and 
write-through operations.
      *
      * @return Cache store factory.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
index 0b0832e..d2593ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.store.jdbc.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.portables.*;
+import org.apache.ignite.resources.*;
 import org.apache.ignite.transactions.*;
 import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
@@ -125,6 +126,10 @@ public abstract class CacheStore<K, V> implements 
CacheLoader<K, V>, CacheWriter
     /** */
     private CacheStoreSession ses;
 
+    /** */
+    @IgniteInstanceResource
+    private Ignite ignite;
+
     /**
      * Loads all values from underlying persistent storage. Note that keys are 
not
      * passed, so it is up to implementation to figure out what to load. This 
method
@@ -165,4 +170,11 @@ public abstract class CacheStore<K, V> implements 
CacheLoader<K, V>, CacheWriter
     @Nullable public CacheStoreSession session() {
         return ses;
     }
+
+    /**
+     * @return {@link Ignite} instance.
+     */
+    public Ignite ignite() {
+        return ignite;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java
index da76ecc..711d375 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java
@@ -88,9 +88,6 @@ public class GridCacheAttributes implements Externalizable {
     /** Flag indicating whether  query indexing is enabled. */
     private boolean qryIdxEnabled;
 
-    /** Flag indicating whether GridGain should activate 
read-through/write-through behaviour by default. */
-    private boolean storeEnabled;
-
     /** Flag indicating whether GridGain should use write-behind behaviour for 
the cache store. */
     private boolean writeBehindEnabled;
 
@@ -154,6 +151,15 @@ public class GridCacheAttributes implements Externalizable 
{
     /** Transaction Manager lookup class name. */
     private String tmLookupClsName;
 
+    /** Store read-through flag. */
+    private boolean readThrough;
+
+    /** Store write-through flag. */
+    private boolean writeThrough;
+
+    /** Store load previous value flag. */
+    private boolean loadPrevVal;
+
     /**
      * @param cfg Cache configuration.
      * @param store Cache store.
@@ -167,13 +173,14 @@ public class GridCacheAttributes implements 
Externalizable {
         evictNearSync = cfg.isEvictNearSynchronized();
         evictSync = cfg.isEvictSynchronized();
         indexingSpiName = cfg.getIndexingSpiName();
+        loadPrevVal = cfg.isLoadPreviousValue();
         name = cfg.getName();
         partDistro = GridCacheUtils.distributionMode(cfg);
         preloadBatchSize = cfg.getPreloadBatchSize();
         preloadMode = cfg.getPreloadMode();
         qryIdxEnabled = cfg.isQueryIndexEnabled();
+        readThrough = cfg.isReadThrough();
         seqReserveSize = cfg.getAtomicSequenceReserveSize();
-        storeEnabled = store != null;
         storeValBytes = cfg.isStoreValueBytes();
         swapEnabled = cfg.isSwapEnabled();
         ttl = cfg.getDefaultTimeToLive();
@@ -183,6 +190,7 @@ public class GridCacheAttributes implements Externalizable {
         writeBehindFlushSize = cfg.getWriteBehindFlushSize();
         writeBehindFlushThreadCnt = cfg.getWriteBehindFlushThreadCount();
         writeSyncMode = cfg.getWriteSynchronizationMode();
+        writeThrough = cfg.isWriteThrough();
 
         affMapperClsName = className(cfg.getAffinityMapper());
 
@@ -451,10 +459,24 @@ public class GridCacheAttributes implements 
Externalizable {
     }
 
     /**
-     * @return Flag indicating whether GridGain should activate 
read-through/write-through behaviour by default.
+     * @return Flag indicating whether read-through behaviour is enabled.
+     */
+    public boolean readThrough() {
+        return readThrough;
+    }
+
+    /**
+     * @return Flag indicating whether read-through behaviour is enabled.
+     */
+    public boolean writeThrough() {
+        return writeThrough;
+    }
+
+    /**
+     * @return Flag indicating whether old value is loaded from store for 
cache operation.
      */
-    public boolean storeEnabled() {
-        return storeEnabled;
+    public boolean loadPreviousValue() {
+        return loadPrevVal;
     }
 
     /**
@@ -516,13 +538,14 @@ public class GridCacheAttributes implements 
Externalizable {
         out.writeBoolean(evictNearSync);
         out.writeBoolean(evictSync);
         U.writeString(out, indexingSpiName);
+        out.writeBoolean(loadPrevVal);
         U.writeString(out, name);
         U.writeEnum0(out, partDistro);
         out.writeInt(preloadBatchSize);
         U.writeEnum0(out, preloadMode);
         out.writeBoolean(qryIdxEnabled);
+        out.writeBoolean(readThrough);
         out.writeInt(seqReserveSize);
-        out.writeBoolean(storeEnabled);
         out.writeBoolean(storeValBytes);
         out.writeBoolean(swapEnabled);
         out.writeLong(ttl);
@@ -532,6 +555,7 @@ public class GridCacheAttributes implements Externalizable {
         out.writeInt(writeBehindFlushSize);
         out.writeInt(writeBehindFlushThreadCnt);
         U.writeEnum0(out, writeSyncMode);
+        out.writeBoolean(writeThrough);
 
         U.writeString(out, affClsName);
         U.writeString(out, affMapperClsName);
@@ -560,13 +584,14 @@ public class GridCacheAttributes implements 
Externalizable {
         evictNearSync = in.readBoolean();
         evictSync  = in.readBoolean();
         indexingSpiName = U.readString(in);
+        loadPrevVal = in.readBoolean();
         name = U.readString(in);
         partDistro = 
GridCacheDistributionMode.fromOrdinal(U.readEnumOrdinal0(in));
         preloadBatchSize = in.readInt();
         preloadMode = GridCachePreloadMode.fromOrdinal(U.readEnumOrdinal0(in));
         qryIdxEnabled = in.readBoolean();
+        readThrough = in.readBoolean();
         seqReserveSize = in.readInt();
-        storeEnabled = in.readBoolean();
         storeValBytes = in.readBoolean();
         swapEnabled = in.readBoolean();
         ttl = in.readLong();
@@ -576,6 +601,7 @@ public class GridCacheAttributes implements Externalizable {
         writeBehindFlushSize = in.readInt();
         writeBehindFlushThreadCnt = in.readInt();
         writeSyncMode = 
GridCacheWriteSynchronizationMode.fromOrdinal(U.readEnumOrdinal0(in));
+        writeThrough = in.readBoolean();
 
         affClsName = U.readString(in);
         affMapperClsName = U.readString(in);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
index 7508ac4..b61b4cf 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
@@ -1406,6 +1406,13 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     }
 
     /**
+     * @return {@code True} if store read-through mode is enabled.
+     */
+    public boolean loadPreviousValue() {
+        return cacheCfg.isLoadPreviousValue();
+    }
+
+    /**
      * @return {@code True} if store write-through is enabled.
      */
     public boolean writeThrough() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index d9c84ab..12e7b82 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -1426,7 +1426,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
 
             GridCacheValueBytes oldBytes = valueBytesUnlocked();
 
-            if (needVal && old == null && cctx.readThrough()) {
+            if (needVal && old == null && (cctx.readThrough() && (op == 
TRANSFORM || cctx.loadPreviousValue()))) {
                 old = readThrough(null, key, false, CU.<K, V>empty(), subjId, 
taskName);
 
                 // Detach value before index update.
@@ -1766,7 +1766,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
 
             GridCacheValueBytes oldBytes = valueBytesUnlocked();
 
-            if (needVal && old == null && cctx.readThrough()) {
+            if (needVal && old == null && (cctx.readThrough() && (op == 
TRANSFORM || cctx.loadPreviousValue()))) {
                 old = readThrough(null, key, false, CU.<K, V>empty(), subjId, 
taskName);
 
                 // Detach value before index update.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
index e013216..793db4e 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
@@ -598,6 +598,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         Collection<GridCacheAdapter<?, ?>> startSeq = new 
ArrayList<>(cfgs.length);
 
+        IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new 
IdentityHashMap<>();
+
         for (int i = 0; i < cfgs.length; i++) {
             CacheConfiguration cfg = new CacheConfiguration(cfgs[i]);
 
@@ -632,7 +634,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             GridCacheTtlManager ttlMgr = new GridCacheTtlManager();
             GridCacheDrManager drMgr = 
ctx.createComponent(GridCacheDrManager.class);
 
-            GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, 
cfgStore, cfg);
+            GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, 
sesHolders, cfgStore, cfg);
 
             GridCacheContext<?, ?> cacheCtx = new GridCacheContext(
                 ctx,
@@ -1116,10 +1118,21 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                         CU.checkAttributeMismatch(log, rmtAttr.cacheName(), 
rmt, "evictionPolicy", "Eviction policy",
                             locAttr.evictionPolicyClassName(), 
rmtAttr.evictionPolicyClassName(), true);
 
-                        if (!skipStoreConsistencyCheck(locAttr, rmtAttr))
+                        if (!skipStoreConsistencyCheck(locAttr, rmtAttr)) {
                             CU.checkAttributeMismatch(log, 
rmtAttr.cacheName(), rmt, "store", "Cache store",
                                 locAttr.storeClassName(), 
rmtAttr.storeClassName(), true);
 
+                            CU.checkAttributeMismatch(log, 
rmtAttr.cacheName(), rmt, "readThrough",
+                                "Read through enabled", locAttr.readThrough(), 
locAttr.readThrough(), true);
+
+                            CU.checkAttributeMismatch(log, 
rmtAttr.cacheName(), rmt, "writeThrough",
+                                "Write through enabled", 
locAttr.writeThrough(), locAttr.writeThrough(), true);
+
+                            CU.checkAttributeMismatch(log, 
rmtAttr.cacheName(), rmt, "loadPreviousValue",
+                                "Load previous value enabled", 
locAttr.loadPreviousValue(),
+                                locAttr.loadPreviousValue(), true);
+                        }
+
                         CU.checkAttributeMismatch(log, rmtAttr.cacheName(), 
rmt, "cloner", "Cache cloner",
                             locAttr.clonerClassName(), 
rmtAttr.clonerClassName(), false);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
index aeb9110..7d92f83 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
@@ -56,10 +56,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
     private final CacheStoreBalancingWrapper<K, Object> singleThreadGate;
 
     /** */
-    private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>();
-
-    /** */
-    private final boolean sesEnabled;
+    private final ThreadLocal<SessionData> sesHolder;
 
     /** */
     private final boolean locStore;
@@ -69,12 +66,15 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
 
     /**
      * @param ctx Kernal context.
+     * @param sesHolders Session holders map to use the same session holder 
for different managers if they use
+     *        the same store instance.
      * @param cfgStore Store provided in configuration.
      * @param cfg Cache configuration.
      * @throws IgniteCheckedException In case of error.
      */
     @SuppressWarnings("unchecked")
     public GridCacheStoreManager(GridKernalContext ctx,
+        IdentityHashMap<CacheStore, ThreadLocal> sesHolders,
         @Nullable CacheStore<K, Object> cfgStore,
         CacheConfiguration cfg) throws IgniteCheckedException {
         this.cfgStore = cfgStore;
@@ -83,24 +83,34 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
 
         singleThreadGate = store == null ? null : new 
CacheStoreBalancingWrapper<>(store);
 
-        if (cfgStore != null && cfg.getAtomicityMode() == TRANSACTIONAL) {
+        ThreadLocal<SessionData> sesHolder0 = null;
+
+        if (cfgStore != null) {
             try {
-                Field sesField = CacheStore.class.getDeclaredField("ses");
+                if (!sesHolders.containsKey(cfgStore)) {
+                    sesHolder0 = new ThreadLocal<>();
+
+                    Field sesField = CacheStore.class.getDeclaredField("ses");
 
-                sesField.setAccessible(true);
+                    sesField.setAccessible(true);
 
-                sesField.set(cfgStore, new ThreadLocalSession(sesHolder));
+                    sesField.set(cfgStore, new ThreadLocalSession(sesHolder0));
 
-                sesEnabled = true;
+                    sesHolders.put(cfgStore, sesHolder0);
+                }
+                else
+                    sesHolder0 = sesHolders.get(cfgStore);
             }
             catch (IllegalAccessException | NoSuchFieldException e) {
                 throw new IgniteCheckedException(e);
             }
         }
-        else
-            sesEnabled = true;
+
+        sesHolder = sesHolder0;
 
         locStore = U.hasAnnotation(cfgStore, CacheLocalStore.class);
+
+        assert sesHolder != null || cfgStore == null;
     }
 
     /**
@@ -236,7 +246,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
 
             Object val = null;
 
-            boolean ses = initSession(tx);
+            initSession(tx);
 
             try {
                 val = singleThreadGate.load(key);
@@ -251,8 +261,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
                 throw new IgniteCheckedException(new CacheLoaderException(e));
             }
             finally {
-                if (ses)
-                    sesHolder.set(null);
+                sesHolder.set(null);
             }
 
             if (log.isDebugEnabled())
@@ -374,7 +383,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
             if (log.isDebugEnabled())
                 log.debug("Loading values from store for keys: " + keys0);
 
-            boolean ses = initSession(tx);
+            initSession(tx);
 
             try {
                 CI2<K, Object> c = new CI2<K, Object>() {
@@ -419,8 +428,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
                 throw new IgniteCheckedException(new CacheLoaderException(e));
             }
             finally {
-                if (ses)
-                    sesHolder.set(null);
+                sesHolder.set(null);
             }
 
             if (log.isDebugEnabled())
@@ -506,7 +514,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
             if (log.isDebugEnabled())
                 log.debug("Storing value in cache store [key=" + key + ", 
val=" + val + ']');
 
-            boolean ses = initSession(tx);
+            initSession(tx);
 
             try {
                 store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) 
: val));
@@ -518,8 +526,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
                 throw new IgniteCheckedException(e);
             }
             finally {
-                if (ses)
-                    sesHolder.set(null);
+                sesHolder.set(null);
             }
 
             if (log.isDebugEnabled())
@@ -575,7 +582,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
                 for (Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e : 
map.entrySet())
                     entries.add(new CacheEntryImpl<>(e.getKey(), locStore ? 
e.getValue() : e.getValue().get1()));
 
-                boolean ses = initSession(tx);
+                initSession(tx);
 
                 try {
                     store.writeAll(entries);
@@ -596,8 +603,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
                     throw new IgniteCheckedException(e);
                 }
                 finally {
-                    if (ses)
-                        sesHolder.set(null);
+                    sesHolder.set(null);
                 }
 
                 if (log.isDebugEnabled())
@@ -628,7 +634,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
             if (log.isDebugEnabled())
                 log.debug("Removing value from cache store [key=" + key + ']');
 
-            boolean ses = initSession(tx);
+            initSession(tx);
 
             try {
                 store.delete(key);
@@ -640,8 +646,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
                 throw new IgniteCheckedException(e);
             }
             finally {
-                if (ses)
-                    sesHolder.set(null);
+                sesHolder.set(null);
             }
 
             if (log.isDebugEnabled())
@@ -677,7 +682,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
             if (log.isDebugEnabled())
                 log.debug("Removing values from cache store [keys=" + keys0 + 
']');
 
-            boolean ses = initSession(tx);
+            initSession(tx);
 
             try {
                 store.deleteAll(keys0);
@@ -692,8 +697,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
                 throw new IgniteCheckedException(e);
             }
             finally {
-                if (ses)
-                    sesHolder.set(null);
+                sesHolder.set(null);
             }
 
             if (log.isDebugEnabled())
@@ -728,17 +732,15 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
     public void txEnd(IgniteTx tx, boolean commit) throws 
IgniteCheckedException {
         assert store != null;
 
-        boolean ses = initSession(tx);
+        initSession(tx);
 
         try {
             store.txEnd(commit);
         }
         finally {
-            if (ses) {
-                sesHolder.set(null);
+            sesHolder.set(null);
 
-                ((GridMetadataAware)tx).removeMeta(SES_ATTR);
-            }
+            ((GridMetadataAware)tx).removeMeta(SES_ATTR);
         }
     }
 
@@ -760,12 +762,8 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
 
     /**
      * @param tx Current transaction.
-     * @return {@code True} if thread local session was initialized.
      */
-    private boolean initSession(@Nullable IgniteTx tx) {
-        if (!sesEnabled)
-            return false;
-
+    private void initSession(@Nullable IgniteTx tx) {
         SessionData ses;
 
         if (tx != null) {
@@ -781,8 +779,6 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
             ses = new SessionData(null, cctx.name());
 
         sesHolder.set(ses);
-
-        return true;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 23f5ce3..186f2a0 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1214,7 +1214,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         assert !ctx.dr().receiveEnabled(); // Cannot update in batches during 
DR due to possible conflicts.
         assert !req.returnValue() || req.operation() == TRANSFORM; // Should 
not request return values for putAll.
 
-        if (!F.isEmpty(req.filter())) {
+        if (!F.isEmpty(req.filter()) && ctx.loadPreviousValue()) {
             try {
                 reloadIfNeeded(locked);
             }
@@ -1416,7 +1416,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         V old = entry.innerGet(
                              null,
                             /*read swap*/true,
-                            /*read through*/true,
+                            /*read through*/ctx.loadPreviousValue(),
                             /*fail fast*/false,
                             /*unmarshal*/true,
                             /*metrics*/true,
@@ -1450,7 +1450,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         V old = entry.innerGet(
                             null,
                             /*read swap*/true,
-                            /*read through*/true,
+                            /*read through*/ctx.loadPreviousValue(),
                             /*fail fast*/false,
                             /*unmarshal*/true,
                             /*metrics*/true,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 978072e..c4080d0 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -1148,7 +1148,7 @@ public class GridLocalAtomicCache<K, V> extends 
GridCacheAdapter<K, V> {
                         if (intercept) {
                             V old = entry.innerGet(null,
                                 /*swap*/true,
-                                /*read-through*/true,
+                                /*read-through*/ctx.loadPreviousValue(),
                                 /*fail-fast*/false,
                                 /*unmarshal*/true,
                                 /**update-metrics*/true,
@@ -1179,7 +1179,7 @@ public class GridLocalAtomicCache<K, V> extends 
GridCacheAdapter<K, V> {
                         if (intercept) {
                             V old = entry.innerGet(null,
                                 /*swap*/true,
-                                /*read-through*/true,
+                                /*read-through*/ctx.loadPreviousValue(),
                                 /*fail-fast*/false,
                                 /*unmarshal*/true,
                                 /**update-metrics*/true,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 123072e..11cd1af 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1942,7 +1942,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends 
IgniteTxAdapter<K, V>
 
                             V old = null;
 
-                            boolean readThrough = !F.isEmptyOrNulls(filter) && 
!F.isAlwaysTrue(filter);
+                            boolean readThrough =
+                                cacheCtx.loadPreviousValue() && 
!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
 
                             if (optimistic()) {
                                 try {
@@ -2275,7 +2276,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends 
IgniteTxAdapter<K, V>
                                 if (!hasPrevVal)
                                     v = cached.innerGet(this,
                                         /*swap*/true,
-                                        /*read-through*/true,
+                                        /*read-through*/invoke || 
cacheCtx.loadPreviousValue(),
                                         /*failFast*/false,
                                         /*unmarshal*/true,
                                         /*metrics*/!invoke,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java
index 8075172..01ae3a6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java
@@ -103,6 +103,7 @@ public class GridCacheJdbcBlobStoreMultithreadedSelfTest 
extends GridCommonAbstr
         cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store()));
         cc.setReadThrough(true);
         cc.setWriteThrough(true);
+        cc.setLoadPreviousValue(true);
 
         c.setCacheConfiguration(cc);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
index bf8206c..ac51490 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
@@ -132,6 +132,7 @@ public abstract class IgniteCacheAbstractTest extends 
GridCommonAbstractTest {
             cfg.setCacheStoreFactory(new 
FactoryBuilder.SingletonFactory(store));
             cfg.setReadThrough(true);
             cfg.setWriteThrough(true);
+            cfg.setLoadPreviousValue(true);
         }
 
         if (cacheMode() == PARTITIONED)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicStoreSessionTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicStoreSessionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicStoreSessionTest.java
new file mode 100644
index 0000000..4488e04
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicStoreSessionTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.integration;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicStoreSessionTest extends 
IgniteCacheStoreSessionAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return PARTITIONED_ONLY;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return PRIMARY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
index 7564f06..8bff838 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
@@ -28,8 +28,6 @@ import javax.cache.processor.*;
 import java.util.*;
 import java.util.concurrent.atomic.*;
 
-import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
-
 /**
  *
  */
@@ -72,88 +70,75 @@ public abstract class IgniteCacheLoaderWriterAbstractTest 
extends IgniteCacheAbs
         storeMap.clear();
     }
 
-    protected boolean putFromPrimary() {
-        return atomicityMode() == ATOMIC;
-    }
-
     /**
      * @throws Exception If failed.
      */
     public void testLoaderWriter() throws Exception {
-        final Object key = Integer.MAX_VALUE;
-
-        for (int i = 0; i < gridCount(); i++) {
-            log.info("Test with grid: " + i);
+        IgniteCache<Object, Object> cache = jcache(0);
 
-            storeMap.clear();
+        Object key = primaryKey(cache);
 
-            ldrCallCnt.set(0);
-            writerCallCnt.set(0);
+        assertNull(cache.get(key));
 
-            IgniteCache<Object, Object> cache = jcache(i);
+        checkCalls(1, 0);
 
-            assertNull(cache.get(key));
+        storeMap.put(key, "test");
 
-            checkCalls(1, 0);
+        assertEquals("test", cache.get(key));
 
-            storeMap.put(key, "test");
+        checkCalls(1, 0);
 
-            assertEquals("test", cache.get(key));
+        assertTrue(storeMap.containsKey(key));
 
-            checkCalls(2, 0);
+        cache.remove(key);
 
-            assertTrue(storeMap.containsKey(key));
+        checkCalls(0, 1);
 
-            cache.remove(key);
-
-            checkCalls(2, 1);
-
-            assertFalse(storeMap.containsKey(key));
+        assertFalse(storeMap.containsKey(key));
 
-            assertNull(cache.get(key));
+        assertNull(cache.get(key));
 
-            checkCalls(3, 1);
+        checkCalls(1, 0);
 
-            cache.put(key, "test1");
+        cache.put(key, "test1");
 
-            checkCalls(3, 2);
+        checkCalls(0, 1);
 
-            assertEquals("test1", storeMap.get(key));
+        assertEquals("test1", storeMap.get(key));
 
-            assertEquals("test1", cache.get(key));
+        assertEquals("test1", cache.get(key));
 
-            checkCalls(3, 2);
+        checkCalls(0, 0);
 
-            cache.invoke(key, new EntryProcessor<Object, Object, Object>() {
-                @Override public Object process(MutableEntry<Object, Object> 
e, Object... args) {
-                    e.setValue("test2");
+        cache.invoke(key, new EntryProcessor<Object, Object, Object>() {
+            @Override public Object process(MutableEntry<Object, Object> e, 
Object... args) {
+                e.setValue("test2");
 
-                    return null;
-                }
-            });
+                return null;
+            }
+        });
 
-            checkCalls(3, 3);
+        checkCalls(0, 1);
 
-            assertEquals("test2", storeMap.get(key));
+        assertEquals("test2", storeMap.get(key));
 
-            assertEquals("test2", cache.get(key));
+        assertEquals("test2", cache.get(key));
 
-            checkCalls(3, 3);
+        checkCalls(0, 0);
 
-            cache.invoke(key, new EntryProcessor<Object, Object, Object>() {
-                @Override public Object process(MutableEntry<Object, Object> 
e, Object... args) {
-                    e.remove();
+        cache.invoke(key, new EntryProcessor<Object, Object, Object>() {
+            @Override public Object process(MutableEntry<Object, Object> e, 
Object... args) {
+                e.remove();
 
-                    return null;
-                }
-            });
+                return null;
+            }
+        });
 
-            checkCalls(3, 4);
+        checkCalls(0, 1);
 
-            assertFalse(storeMap.containsKey(key));
+        assertFalse(storeMap.containsKey(key));
 
-            assertNull(cache.get(key));
-        }
+        assertNull(cache.get(key));
     }
 
     /**
@@ -162,43 +147,63 @@ public abstract class IgniteCacheLoaderWriterAbstractTest 
extends IgniteCacheAbs
     public void testLoaderWriterBulk() throws Exception {
         Map<Object, Object> vals = new HashMap<>();
 
-        for (int i = 0; i < 100; i++)
-            vals.put(i, i);
+        IgniteCache<Object, Object> cache = jcache(0);
 
-        for (int i = 0; i < gridCount(); i++) {
-            log.info("Test with grid: " + i);
+        for (Object key : primaryKeys(cache, 100, 0))
+            vals.put(key, key);
 
-            storeMap.clear();
+        assertTrue(cache.getAll(vals.keySet()).isEmpty());
 
-            ldrCallCnt.set(0);
-            writerCallCnt.set(0);
+        checkCalls(1, 0);
 
-            IgniteCache<Object, Object> cache = jcache(i);
+        storeMap.putAll(vals);
 
-            assertTrue(cache.getAll(vals.keySet()).isEmpty());
+        assertEquals(vals, cache.getAll(vals.keySet()));
 
-            int expLoads = gridCount();
+        checkCalls(1, 0);
 
-            checkCalls(expLoads, 0);
+        for (Object key : vals.keySet())
+            assertEquals(key, storeMap.get(key));
 
-            storeMap.putAll(vals);
+        cache.removeAll(vals.keySet());
 
-            assertEquals(vals, cache.getAll(vals.keySet()));
+        checkCalls(0, 1);
 
-            expLoads += gridCount();
+        for (Object key : vals.keySet())
+            assertFalse(storeMap.containsKey(key));
 
-            checkCalls(expLoads, 0);
+        cache.putAll(vals);
 
-            for (Object key : vals.keySet())
-                assertTrue(storeMap.contains(key));
+        checkCalls(0, 1);
 
-            cache.removeAll(vals.keySet());
+        for (Object key : vals.keySet())
+            assertEquals(key, storeMap.get(key));
 
-            checkCalls(expLoads, gridCount());
+        cache.invokeAll(vals.keySet(), new EntryProcessor<Object, Object, 
Object>() {
+            @Override public Object process(MutableEntry<Object, Object> 
entry, Object... args) {
+                entry.setValue("test1");
 
-            for (Object key : vals.keySet())
-                assertFalse(storeMap.containsKey(key));
-        }
+                return null;
+            }
+        });
+
+        checkCalls(0, 1);
+
+        for (Object key : vals.keySet())
+            assertEquals("test1", storeMap.get(key));
+
+        cache.invokeAll(vals.keySet(), new EntryProcessor<Object, Object, 
Object>() {
+            @Override public Object process(MutableEntry<Object, Object> 
entry, Object... args) {
+                entry.remove();
+
+                return null;
+            }
+        });
+
+        checkCalls(0, 1);
+
+        for (Object key : vals.keySet())
+            assertFalse(storeMap.containsKey(key));
     }
 
     /**
@@ -207,8 +212,10 @@ public abstract class IgniteCacheLoaderWriterAbstractTest 
extends IgniteCacheAbs
      */
     private void checkCalls(int expLdr, int expWriter) {
         assertEquals(expLdr, ldrCallCnt.get());
-
         assertEquals(expWriter, writerCallCnt.get());
+
+        ldrCallCnt.set(0);
+        writerCallCnt.set(0);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java
new file mode 100644
index 0000000..87de0bf
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java
@@ -0,0 +1,303 @@
+package org.apache.ignite.internal.processors.cache.integration;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import javax.cache.processor.*;
+import java.util.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+
+/**
+ *
+ */
+public abstract class IgniteCacheStoreSessionAbstractTest extends 
IgniteCacheAbstractTest {
+    /** */
+    protected static volatile List<ExpectedData> expData;
+
+    /** */
+    protected static final String CACHE_NAME1 = "cache1";
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TestStore store = new TestStore(); // Use the same store instance for 
both caches.
+
+        assert cfg.getCacheConfiguration().length == 1;
+
+        CacheConfiguration ccfg0 = cfg.getCacheConfiguration()[0];
+
+        ccfg0.setReadThrough(true);
+        ccfg0.setWriteThrough(true);
+
+        ccfg0.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
+
+        CacheConfiguration ccfg1 = cacheConfiguration(gridName);
+
+        ccfg1.setReadThrough(true);
+        ccfg1.setWriteThrough(true);
+
+        ccfg1.setName(CACHE_NAME1);
+
+        ccfg1.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
+
+        cfg.setCacheConfiguration(ccfg0, ccfg1);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        expData = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        expData = Collections.synchronizedList(new ArrayList<ExpectedData>());
+
+        super.beforeTestsStarted();
+    }
+
+    /**
+     * @param cache Cache.
+     * @param cnt Keys count.
+     * @return Keys.
+     * @throws Exception If failed.
+     */
+    protected List<Integer> testKeys(IgniteCache cache, int cnt) throws 
Exception {
+        return primaryKeys(cache, cnt, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStoreSession() throws Exception {
+        assertNull(jcache(0).getName());
+
+        assertEquals(CACHE_NAME1, ignite(0).jcache(CACHE_NAME1).getName());
+
+        testStoreSession(jcache(0));
+
+        testStoreSession(ignite(0).jcache(CACHE_NAME1));
+    }
+
+    /**
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void testStoreSession(IgniteCache<Object, Object> cache) throws 
Exception {
+        Set<Integer> keys = new HashSet<>(primaryKeys(cache, 3, 100_000));
+
+        Integer key = keys.iterator().next();
+
+        boolean tx = atomicityMode() == TRANSACTIONAL;
+
+        expData.add(new ExpectedData(false, "load", new HashMap<>(), 
cache.getName()));
+
+        assertEquals(key, cache.get(key));
+
+        assertTrue(expData.isEmpty());
+
+        expData.add(new ExpectedData(false, "loadAll", new HashMap<>(), 
cache.getName()));
+
+        assertEquals(3, cache.getAll(keys).size());
+
+        assertTrue(expData.isEmpty());
+
+        expectedData(tx, "write", cache.getName());
+
+        cache.put(key, key);
+
+        assertTrue(expData.isEmpty());
+
+        expectedData(tx, "write", cache.getName());
+
+        cache.invoke(key, new EntryProcessor<Object, Object, Object>() {
+            @Override public Object process(MutableEntry<Object, Object> e, 
Object... args) {
+                e.setValue("val1");
+
+                return null;
+            }
+        });
+
+        assertTrue(expData.isEmpty());
+
+        expectedData(tx, "delete", cache.getName());
+
+        cache.remove(key);
+
+        assertTrue(expData.isEmpty());
+
+        Map<Object, Object> vals = new HashMap<>();
+
+        for (Object key0 : keys)
+            vals.put(key0, key0);
+
+        expectedData(tx, "writeAll", cache.getName());
+
+        cache.putAll(vals);
+
+        assertTrue(expData.isEmpty());
+
+        expectedData(tx, "deleteAll", cache.getName());
+
+        cache.removeAll(keys);
+
+        assertTrue(expData.isEmpty());
+    }
+
+    /**
+     * @param tx {@code True} is transaction is expected.
+     * @param expMtd Expected method.
+     * @param expCacheName Expected cache name.
+     */
+    private void expectedData(boolean tx, String expMtd, String expCacheName) {
+        expData.add(new ExpectedData(tx, expMtd, new HashMap<>(), 
expCacheName));
+
+        if (tx)
+            expData.add(new ExpectedData(true, "txEnd", F.<Object, 
Object>asMap(0, expMtd), expCacheName));
+    }
+
+    /**
+     *
+     */
+    static class ExpectedData {
+        /** */
+        private final boolean tx;
+
+        /** */
+        private final String expMtd;
+
+        /** */
+        private  final Map<Object, Object> expProps;
+
+        /** */
+        private final String expCacheName;
+
+        /**
+         * @param tx {@code True} if transaction is enabled.
+         * @param expMtd Expected method.
+         * @param expProps Expected properties.
+         * @param expCacheName Expected cache name.
+         */
+        public ExpectedData(boolean tx, String expMtd, Map<Object, Object> 
expProps, String expCacheName) {
+            this.tx = tx;
+            this.expMtd = expMtd;
+            this.expProps = expProps;
+            this.expCacheName = expCacheName;
+        }
+    }
+
+    /**
+     *
+     */
+    private class TestStore extends CacheStore<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, 
@Nullable Object... args) {
+            fail();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void txEnd(boolean commit) throws 
CacheWriterException {
+            log.info("Tx end [commit=" + commit + ", tx=" + 
session().transaction() + ']');
+
+            checkSession("txEnd");
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object load(Object key) throws CacheLoaderException {
+            log.info("Load [key=" + key + ", tx=" + session().transaction() + 
']');
+
+            checkSession("load");
+
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<Object, Object> loadAll(Iterable<?> keys) throws 
CacheLoaderException {
+            log.info("LoadAll [keys=" + keys + ", tx=" + 
session().transaction() + ']');
+
+            checkSession("loadAll");
+
+            Map<Object, Object> loaded = new HashMap<>();
+
+            for (Object key : keys)
+                loaded.put(key, key);
+
+            return loaded;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<?, ?> entry) throws 
CacheWriterException {
+            log.info("Write [write=" + entry + ", tx=" + 
session().transaction() + ']');
+
+            checkSession("write");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) 
throws CacheWriterException {
+            log.info("WriteAll: [writeAll=" + entries + ", tx=" + 
session().transaction() + ']');
+
+            checkSession("writeAll");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            log.info("Delete [key=" + key + ", tx=" + session().transaction() 
+ ']');
+
+            checkSession("delete");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void deleteAll(Collection<?> keys) throws 
CacheWriterException {
+            log.info("DeleteAll [keys=" + keys + ", tx=" + 
session().transaction() + ']');
+
+            checkSession("deleteAll");
+        }
+
+        /**
+         * @param mtd Called stored method.
+         */
+        private void checkSession(String mtd) {
+            assertNotNull(ignite());
+
+            assertFalse(expData.isEmpty());
+
+            ExpectedData exp = expData.remove(0);
+
+            assertEquals(exp.expMtd, mtd);
+
+            CacheStoreSession ses = session();
+
+            assertNotNull(ses);
+
+            if (exp.tx)
+                assertNotNull(ses.transaction());
+            else
+                assertNull(ses.transaction());
+
+            Map<Object, Object> props = ses.properties();
+
+            assertNotNull(props);
+
+            assertEquals(exp.expProps, props);
+
+            props.put(props.size(), mtd);
+
+            assertEquals(exp.expCacheName, ses.cacheName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionTest.java
deleted file mode 100644
index d333456..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionTest.java
+++ /dev/null
@@ -1,435 +0,0 @@
-package org.apache.ignite.internal.processors.cache.integration;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import javax.cache.integration.*;
-import java.util.*;
-
-import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
-import static org.apache.ignite.transactions.IgniteTxIsolation.*;
-import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
-import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
-import static org.gridgain.grid.cache.GridCacheMode.*;
-
-/**
- *
- */
-public class IgniteCacheStoreSessionTest extends IgniteCacheAbstractTest {
-    /** */
-    private static volatile List<ExpectedData> expData;
-
-    /** */
-    private static final String CACHE_NAME1 = "cache1";
-
-    private TestStore store = new TestStore();
-
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 3;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected GridCacheMode cacheMode() {
-        return PARTITIONED;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected GridCacheAtomicityMode atomicityMode() {
-        return TRANSACTIONAL;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected GridCacheDistributionMode distributionMode() {
-        return PARTITIONED_ONLY;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected CacheStore<?, ?> cacheStore() {
-        return store;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        assert cfg.getCacheConfiguration().length == 1;
-
-        CacheConfiguration ccfg0 = cfg.getCacheConfiguration()[0];
-
-        CacheConfiguration ccfg1 = cacheConfiguration(gridName);
-
-        ccfg1.setName(CACHE_NAME1);
-
-        cfg.setCacheConfiguration(ccfg0, ccfg1);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        super.afterTestsStopped();
-
-        expData = null;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        expData = Collections.synchronizedList(new ArrayList<ExpectedData>());
-
-        super.beforeTestsStarted();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStoreSession() throws Exception {
-        testTxPut(jcache(0), null, null);
-
-        testTxPut(ignite(0).jcache(CACHE_NAME1), null, null);
-
-        testTxRemove(null, null);
-
-        testTxPutRemove(null, null);
-
-        for (IgniteTxConcurrency concurrency : F.asList(PESSIMISTIC)) {
-            for (IgniteTxIsolation isolation : F.asList(REPEATABLE_READ)) {
-                testTxPut(jcache(0), concurrency, isolation);
-
-                testTxRemove(concurrency, isolation);
-
-                testTxPutRemove(concurrency, isolation);
-            }
-        }
-    }
-
-    /**
-     * @param cnt Keys count.
-     * @return Keys.
-     * @throws Exception If failed.
-     */
-    private List<Integer> testKeys(IgniteCache cache, int cnt) throws 
Exception {
-        return primaryKeys(cache, cnt, 0);
-    }
-
-    /**
-     * @param concurrency Concurrency mode.
-     * @param isolation Isolation mode.
-     * @throws Exception If failed.
-     */
-    private void testTxPutRemove(IgniteTxConcurrency concurrency, 
IgniteTxIsolation isolation) throws Exception {
-        log.info("Test tx put/remove [concurrency=" + concurrency + ", 
isolation=" + isolation + ']');
-
-        IgniteCache<Integer, Integer> cache = jcache(0);
-
-        List<Integer> keys = testKeys(cache, 3);
-
-        Integer key1 = keys.get(0);
-        Integer key2 = keys.get(1);
-        Integer key3 = keys.get(2);
-
-        try (IgniteTx tx = startTx(concurrency, isolation)) {
-            log.info("Do tx put1.");
-
-            cache.put(key1, key1);
-
-            log.info("Do tx put2.");
-
-            cache.put(key2, key2);
-
-            log.info("Do tx remove.");
-
-            cache.remove(key3);
-
-            expData.add(new ExpectedData("writeAll", new HashMap<>(), null));
-            expData.add(new ExpectedData("delete", F.<Object, Object>asMap(0, 
"writeAll"), null));
-            expData.add(new ExpectedData("txEnd", F.<Object, Object>asMap(0, 
"writeAll", 1, "delete"), null));
-
-            log.info("Do tx commit.");
-
-            tx.commit();
-        }
-
-        assertEquals(0, expData.size());
-    }
-
-    /**
-     * @param concurrency Concurrency mode.
-     * @param isolation Isolation mode.
-     * @throws Exception If failed.
-     */
-    private void testTxPut(IgniteCache<Object, Object> cache,
-        IgniteTxConcurrency concurrency,
-        IgniteTxIsolation isolation) throws Exception {
-        log.info("Test tx put [concurrency=" + concurrency + ", isolation=" + 
isolation + ']');
-
-        List<Integer> keys = testKeys(cache, 3);
-
-        Integer key1 = keys.get(0);
-
-        try (IgniteTx tx = startTx(concurrency, isolation)) {
-            log.info("Do tx get.");
-
-            cache.get(key1);
-
-            log.info("Do tx put.");
-
-            cache.put(key1, key1);
-
-            expData.add(new ExpectedData("write", new HashMap<>(), 
cache.getName()));
-            expData.add(new ExpectedData("txEnd", F.<Object, Object>asMap(0, 
"write"), cache.getName()));
-
-            log.info("Do tx commit.");
-
-            tx.commit();
-        }
-
-        assertEquals(0, expData.size());
-
-        Integer key2 = keys.get(1);
-        Integer key3 = keys.get(2);
-
-        try (IgniteTx tx = startTx(concurrency, isolation);) {
-            log.info("Do tx put1.");
-
-            cache.put(key2, key2);
-
-            log.info("Do tx put2.");
-
-            cache.put(key3, key3);
-
-            expData.add(new ExpectedData("writeAll", new HashMap<>(), 
cache.getName()));
-            expData.add(new ExpectedData("txEnd", F.<Object, Object>asMap(0, 
"writeAll"), cache.getName()));
-
-            log.info("Do tx commit.");
-
-            tx.commit();
-        }
-
-        assertEquals(0, expData.size());
-    }
-
-    /**
-     * @param concurrency Concurrency mode.
-     * @param isolation Isolation mode.
-     * @throws Exception If failed.
-     */
-    private void testTxRemove(IgniteTxConcurrency concurrency, 
IgniteTxIsolation isolation) throws Exception {
-        log.info("Test tx remove [concurrency=" + concurrency + ", isolation=" 
+ isolation + ']');
-
-        IgniteCache<Integer, Integer> cache = jcache(0);
-
-        List<Integer> keys = testKeys(cache, 3);
-
-        Integer key1 = keys.get(0);
-
-        try (IgniteTx tx = startTx(concurrency, isolation)) {
-            log.info("Do tx get.");
-
-            cache.get(key1);
-
-            log.info("Do tx remove.");
-
-            cache.remove(key1, key1);
-
-            expData.add(new ExpectedData("delete", new HashMap<>(), null));
-            expData.add(new ExpectedData("txEnd", F.<Object, Object>asMap(0, 
"delete"), null));
-
-            log.info("Do tx commit.");
-
-            tx.commit();
-        }
-
-        assertEquals(0, expData.size());
-
-        Integer key2 = keys.get(1);
-        Integer key3 = keys.get(2);
-
-        try (IgniteTx tx = startTx(concurrency, isolation);) {
-            log.info("Do tx remove1.");
-
-            cache.remove(key2, key2);
-
-            log.info("Do tx remove2.");
-
-            cache.remove(key3, key3);
-
-            expData.add(new ExpectedData("deleteAll", new HashMap<>(), null));
-            expData.add(new ExpectedData("txEnd", F.<Object, Object>asMap(0, 
"deleteAll"), null));
-
-            log.info("Do tx commit.");
-
-            tx.commit();
-        }
-
-        assertEquals(0, expData.size());
-    }
-
-    /**
-     * @param concurrency Concurrency mode.
-     * @param isolation Isolation mode.
-     * @return Transaction.
-     */
-    private IgniteTx startTx(IgniteTxConcurrency concurrency, 
IgniteTxIsolation isolation) {
-        IgniteTransactions txs = ignite(0).transactions();
-
-        if (concurrency == null)
-            return txs.txStart();
-
-        return txs.txStart(concurrency, isolation);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSessionCrossCacheTx() throws Exception {
-        IgniteCache<Object, Object> cache0 = ignite(0).jcache(null);
-
-        IgniteCache<Object, Object> cache1 = ignite(0).jcache(CACHE_NAME1);
-
-        Integer key1 = primaryKey(cache0);
-        Integer key2 = primaryKeys(cache1, 1, key1 + 1).get(0);
-
-        try (IgniteTx tx = startTx(null, null)) {
-            cache1.put(key2, key2);
-
-            cache0.put(key1, key1);
-
-            expData.add(new ExpectedData("writeAll", new HashMap<>(), null));
-            expData.add(new ExpectedData("txEnd", F.<Object, Object>asMap(0, 
"writeAll"), null));
-
-            tx.commit();
-        }
-
-        assertEquals(0, expData.size());
-    }
-
-    /**
-     *
-     */
-    static class ExpectedData {
-        /** */
-        private final String expMtd;
-
-        /** */
-        private  final Map<Object, Object> expProps;
-
-        /** */
-        private final String expCacheName;
-
-        /**
-         * @param expMtd Expected method.
-         * @param expProps Expected properties.
-         * @param expCacheName Expected cache name.
-         */
-        public ExpectedData(String expMtd, Map<Object, Object> expProps, 
String expCacheName) {
-            this.expMtd = expMtd;
-            this.expProps = expProps;
-            this.expCacheName = expCacheName;
-        }
-    }
-
-    /**
-     *
-     */
-    private class TestStore extends CacheStore<Object, Object> {
-        /** {@inheritDoc} */
-        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, 
@Nullable Object... args) {
-            fail();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void txEnd(boolean commit) throws 
CacheWriterException {
-            log.info("Tx end [commit=" + commit + ", tx=" + 
session().transaction() + ']');
-
-            checkSession("txEnd");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object load(Object key) throws CacheLoaderException {
-            log.info("Load [key=" + key + ", tx=" + session().transaction() + 
']');
-
-            checkSession("load");
-
-            return key;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Map<Object, Object> loadAll(Iterable<?> keys) throws 
CacheLoaderException {
-            log.info("LoadAll [keys=" + keys + ", tx=" + 
session().transaction() + ']');
-
-            checkSession("loadAll");
-
-            Map<Object, Object> loaded = new HashMap<>();
-
-            for (Object key : keys)
-                loaded.put(key, key);
-
-            return loaded;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(Cache.Entry<?, ?> entry) throws 
CacheWriterException {
-            log.info("Write [write=" + entry + ", tx=" + 
session().transaction() + ']');
-
-            checkSession("write");
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) 
throws CacheWriterException {
-            log.info("WriteAll: [writeAll=" + entries + ", tx=" + 
session().transaction() + ']');
-
-            checkSession("writeAll");
-        }
-
-        /** {@inheritDoc} */
-        @Override public void delete(Object key) throws CacheWriterException {
-            log.info("Delete [key=" + key + ", tx=" + session().transaction() 
+ ']');
-
-            checkSession("delete");
-        }
-
-        /** {@inheritDoc} */
-        @Override public void deleteAll(Collection<?> keys) throws 
CacheWriterException {
-            log.info("DeleteAll [keys=" + keys + ", tx=" + 
session().transaction() + ']');
-
-            checkSession("deleteAll");
-        }
-
-        /**
-         * @param mtd Called stored method.
-         */
-        private void checkSession(String mtd) {
-            assertFalse(expData.isEmpty());
-
-            ExpectedData exp = expData.remove(0);
-
-            assertEquals(exp.expMtd, mtd);
-
-            CacheStoreSession ses = session();
-
-            assertNotNull(ses);
-
-            assertNotNull(ses.transaction());
-
-            Map<Object, Object> props = ses.properties();
-
-            assertNotNull(props);
-
-            assertEquals(exp.expProps, props);
-
-            props.put(props.size(), mtd);
-
-            assertEquals(exp.expCacheName, ses.cacheName());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java
new file mode 100644
index 0000000..99900cb
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.integration;
+
+import org.apache.ignite.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.util.typedef.*;
+
+import java.util.*;
+
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheTxStoreSessionTest extends 
IgniteCacheStoreSessionAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return PARTITIONED_ONLY;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStoreSessionTx() throws Exception {
+        testTxPut(jcache(0), null, null);
+
+        testTxPut(ignite(0).jcache(CACHE_NAME1), null, null);
+
+        testTxRemove(null, null);
+
+        testTxPutRemove(null, null);
+
+        for (IgniteTxConcurrency concurrency : F.asList(PESSIMISTIC)) {
+            for (IgniteTxIsolation isolation : F.asList(REPEATABLE_READ)) {
+                testTxPut(jcache(0), concurrency, isolation);
+
+                testTxRemove(concurrency, isolation);
+
+                testTxPutRemove(concurrency, isolation);
+            }
+        }
+    }
+
+    /**
+     * @param concurrency Concurrency mode.
+     * @param isolation Isolation mode.
+     * @throws Exception If failed.
+     */
+    private void testTxPutRemove(IgniteTxConcurrency concurrency, 
IgniteTxIsolation isolation) throws Exception {
+        log.info("Test tx put/remove [concurrency=" + concurrency + ", 
isolation=" + isolation + ']');
+
+        IgniteCache<Integer, Integer> cache = jcache(0);
+
+        List<Integer> keys = testKeys(cache, 3);
+
+        Integer key1 = keys.get(0);
+        Integer key2 = keys.get(1);
+        Integer key3 = keys.get(2);
+
+        try (IgniteTx tx = startTx(concurrency, isolation)) {
+            log.info("Do tx put1.");
+
+            cache.put(key1, key1);
+
+            log.info("Do tx put2.");
+
+            cache.put(key2, key2);
+
+            log.info("Do tx remove.");
+
+            cache.remove(key3);
+
+            expData.add(new ExpectedData(true, "writeAll", new HashMap<>(), 
null));
+            expData.add(new ExpectedData(true, "delete", F.<Object, 
Object>asMap(0, "writeAll"), null));
+            expData.add(new ExpectedData(true, "txEnd", F.<Object, 
Object>asMap(0, "writeAll", 1, "delete"), null));
+
+            log.info("Do tx commit.");
+
+            tx.commit();
+        }
+
+        assertEquals(0, expData.size());
+    }
+
+    /**
+     * @param cache Cache.
+     * @param concurrency Concurrency mode.
+     * @param isolation Isolation mode.
+     * @throws Exception If failed.
+     */
+    private void testTxPut(IgniteCache<Object, Object> cache,
+        IgniteTxConcurrency concurrency,
+        IgniteTxIsolation isolation) throws Exception {
+        log.info("Test tx put [concurrency=" + concurrency + ", isolation=" + 
isolation + ']');
+
+        List<Integer> keys = testKeys(cache, 3);
+
+        Integer key1 = keys.get(0);
+
+        try (IgniteTx tx = startTx(concurrency, isolation)) {
+            log.info("Do tx get.");
+
+            cache.get(key1);
+
+            log.info("Do tx put.");
+
+            cache.put(key1, key1);
+
+            expData.add(new ExpectedData(true, "write", new HashMap<>(), 
cache.getName()));
+            expData.add(new ExpectedData(true, "txEnd", F.<Object, 
Object>asMap(0, "write"), cache.getName()));
+
+            log.info("Do tx commit.");
+
+            tx.commit();
+        }
+
+        assertEquals(0, expData.size());
+
+        Integer key2 = keys.get(1);
+        Integer key3 = keys.get(2);
+
+        try (IgniteTx tx = startTx(concurrency, isolation)) {
+            log.info("Do tx put1.");
+
+            cache.put(key2, key2);
+
+            log.info("Do tx put2.");
+
+            cache.put(key3, key3);
+
+            expData.add(new ExpectedData(true, "writeAll", new HashMap<>(), 
cache.getName()));
+            expData.add(new ExpectedData(true, "txEnd", F.<Object, 
Object>asMap(0, "writeAll"), cache.getName()));
+
+            log.info("Do tx commit.");
+
+            tx.commit();
+        }
+
+        assertEquals(0, expData.size());
+    }
+
+    /**
+     * @param concurrency Concurrency mode.
+     * @param isolation Isolation mode.
+     * @throws Exception If failed.
+     */
+    private void testTxRemove(IgniteTxConcurrency concurrency, 
IgniteTxIsolation isolation) throws Exception {
+        log.info("Test tx remove [concurrency=" + concurrency + ", isolation=" 
+ isolation + ']');
+
+        IgniteCache<Integer, Integer> cache = jcache(0);
+
+        List<Integer> keys = testKeys(cache, 3);
+
+        Integer key1 = keys.get(0);
+
+        try (IgniteTx tx = startTx(concurrency, isolation)) {
+            log.info("Do tx get.");
+
+            cache.get(key1);
+
+            log.info("Do tx remove.");
+
+            cache.remove(key1, key1);
+
+            expData.add(new ExpectedData(true, "delete", new HashMap<>(), 
null));
+            expData.add(new ExpectedData(true, "txEnd", F.<Object, 
Object>asMap(0, "delete"), null));
+
+            log.info("Do tx commit.");
+
+            tx.commit();
+        }
+
+        assertEquals(0, expData.size());
+
+        Integer key2 = keys.get(1);
+        Integer key3 = keys.get(2);
+
+        try (IgniteTx tx = startTx(concurrency, isolation)) {
+            log.info("Do tx remove1.");
+
+            cache.remove(key2, key2);
+
+            log.info("Do tx remove2.");
+
+            cache.remove(key3, key3);
+
+            expData.add(new ExpectedData(true, "deleteAll", new HashMap<>(), 
null));
+            expData.add(new ExpectedData(true, "txEnd", F.<Object, 
Object>asMap(0, "deleteAll"), null));
+
+            log.info("Do tx commit.");
+
+            tx.commit();
+        }
+
+        assertEquals(0, expData.size());
+    }
+
+    /**
+     * @param concurrency Concurrency mode.
+     * @param isolation Isolation mode.
+     * @return Transaction.
+     */
+    private IgniteTx startTx(IgniteTxConcurrency concurrency, 
IgniteTxIsolation isolation) {
+        IgniteTransactions txs = ignite(0).transactions();
+
+        if (concurrency == null)
+            return txs.txStart();
+
+        return txs.txStart(concurrency, isolation);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSessionCrossCacheTx() throws Exception {
+        IgniteCache<Object, Object> cache0 = ignite(0).jcache(null);
+
+        IgniteCache<Object, Object> cache1 = ignite(0).jcache(CACHE_NAME1);
+
+        Integer key1 = primaryKey(cache0);
+        Integer key2 = primaryKeys(cache1, 1, key1 + 1).get(0);
+
+        try (IgniteTx tx = startTx(null, null)) {
+            cache0.put(key1, 1);
+
+            cache1.put(key2, 0);
+
+            expData.add(new ExpectedData(true, "writeAll", new HashMap<>(), 
null));
+            expData.add(new ExpectedData(true, "txEnd", F.<Object, 
Object>asMap(0, "writeAll"), null));
+
+            tx.commit();
+        }
+
+        assertEquals(0, expData.size());
+
+        try (IgniteTx tx = startTx(null, null)) {
+            cache1.put(key1, 1);
+
+            cache0.put(key2, 0);
+
+            expData.add(new ExpectedData(true, "writeAll", new HashMap<>(), 
null));
+            expData.add(new ExpectedData(true, "txEnd", F.<Object, 
Object>asMap(0, "writeAll"), null));
+
+            tx.commit();
+        }
+
+        assertEquals(0, expData.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/gridgain/grid/cache/store/GridCacheLoadOnlyStoreAdapterSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/cache/store/GridCacheLoadOnlyStoreAdapterSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/cache/store/GridCacheLoadOnlyStoreAdapterSelfTest.java
index 3a761f8..931f428 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/cache/store/GridCacheLoadOnlyStoreAdapterSelfTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/cache/store/GridCacheLoadOnlyStoreAdapterSelfTest.java
@@ -50,6 +50,7 @@ public class GridCacheLoadOnlyStoreAdapterSelfTest extends 
GridCacheAbstractSelf
         cfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new 
TestStore()));
         cfg.setReadThrough(true);
         cfg.setWriteThrough(true);
+        cfg.setLoadPreviousValue(true);
 
         return cfg;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractSelfTest.java
index 4860f4d..e70ca7a 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractSelfTest.java
@@ -239,6 +239,7 @@ public abstract class GridCacheAbstractSelfTest extends 
GridCommonAbstractTest {
             cfg.setCacheStoreFactory(new 
FactoryBuilder.SingletonFactory(store));
             cfg.setReadThrough(true);
             cfg.setWriteThrough(true);
+            cfg.setLoadPreviousValue(true);
         }
 
         cfg.setSwapEnabled(swapEnabled());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreAbstractTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreAbstractTest.java
index 0bb03b8..3c9c1b2 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreAbstractTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreAbstractTest.java
@@ -95,6 +95,7 @@ public abstract class GridCacheBasicStoreAbstractTest extends 
GridCommonAbstract
         cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
         cc.setReadThrough(true);
         cc.setWriteThrough(true);
+        cc.setLoadPreviousValue(true);
 
         c.setCacheConfiguration(cc);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java
index bd8e8ed..88d8638 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java
@@ -81,6 +81,7 @@ public abstract class 
GridCacheBasicStoreMultithreadedAbstractTest extends GridC
         cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
         cc.setReadThrough(true);
         cc.setWriteThrough(true);
+        cc.setLoadPreviousValue(true);
 
         c.setCacheConfiguration(cc);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
index d54cd41..db6f67c 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
@@ -746,6 +746,7 @@ public class GridCacheConfigurationConsistencySelfTest 
extends GridCommonAbstrac
                 cfg.setCacheStoreFactory(new 
FactoryBuilder.SingletonFactory(new TestStore()));
                 cfg.setReadThrough(true);
                 cfg.setWriteThrough(true);
+                cfg.setLoadPreviousValue(true);
 
                 return null;
             }
@@ -787,6 +788,7 @@ public class GridCacheConfigurationConsistencySelfTest 
extends GridCommonAbstrac
                 cc.setCacheStoreFactory(new 
FactoryBuilder.SingletonFactory(new TestStore()));
                 cc.setReadThrough(true);
                 cc.setWriteThrough(true);
+                cc.setLoadPreviousValue(true);
 
                 return null;
             }
@@ -825,6 +827,7 @@ public class GridCacheConfigurationConsistencySelfTest 
extends GridCommonAbstrac
                 cc.setCacheStoreFactory(new 
FactoryBuilder.SingletonFactory(new TestStore()));
                 cc.setReadThrough(true);
                 cc.setWriteThrough(true);
+                cc.setLoadPreviousValue(true);
 
                 return null;
             }
@@ -869,6 +872,7 @@ public class GridCacheConfigurationConsistencySelfTest 
extends GridCommonAbstrac
                 cc.setCacheStoreFactory(new 
FactoryBuilder.SingletonFactory(new TestStore()));
                 cc.setReadThrough(true);
                 cc.setWriteThrough(true);
+                cc.setLoadPreviousValue(true);
 
                 return null;
             }
@@ -915,6 +919,7 @@ public class GridCacheConfigurationConsistencySelfTest 
extends GridCommonAbstrac
                 cc.setCacheStoreFactory(new 
FactoryBuilder.SingletonFactory(new TestStore()));
                 cc.setReadThrough(true);
                 cc.setWriteThrough(true);
+                cc.setLoadPreviousValue(true);
 
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
index 0f2de67..2ee15b4 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
@@ -94,6 +94,7 @@ public abstract class 
GridCacheGetAndTransformStoreAbstractTest extends GridComm
         cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
         cc.setReadThrough(true);
         cc.setWriteThrough(true);
+        cc.setLoadPreviousValue(true);
 
         c.setCacheConfiguration(cc);
 

Reply via email to