# ignite-42
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/25f7984a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/25f7984a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/25f7984a Branch: refs/heads/ignite-32 Commit: 25f7984a2a581d94fac6bc282cc9f8fb8097c3b7 Parents: ee7bb3f Author: sboikov <sboi...@gridgain.com> Authored: Wed Jan 21 10:28:40 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jan 21 12:36:45 2015 +0300 ---------------------------------------------------------------------- .../integration/GridClientAbstractSelfTest.java | 1 + .../apache/ignite/cache/CacheConfiguration.java | 18 + .../apache/ignite/cache/store/CacheStore.java | 12 + .../processors/cache/GridCacheAttributes.java | 44 +- .../processors/cache/GridCacheContext.java | 7 + .../processors/cache/GridCacheMapEntry.java | 4 +- .../processors/cache/GridCacheProcessor.java | 17 +- .../processors/cache/GridCacheStoreManager.java | 78 ++-- .../dht/atomic/GridDhtAtomicCache.java | 6 +- .../local/atomic/GridLocalAtomicCache.java | 4 +- .../transactions/IgniteTxLocalAdapter.java | 5 +- ...CacheJdbcBlobStoreMultithreadedSelfTest.java | 1 + .../cache/IgniteCacheAbstractTest.java | 1 + .../IgniteCacheAtomicStoreSessionTest.java | 55 +++ .../IgniteCacheLoaderWriterAbstractTest.java | 157 +++---- .../IgniteCacheStoreSessionAbstractTest.java | 303 +++++++++++++ .../IgniteCacheStoreSessionTest.java | 435 ------------------- .../IgniteCacheTxStoreSessionTest.java | 285 ++++++++++++ .../GridCacheLoadOnlyStoreAdapterSelfTest.java | 1 + .../cache/GridCacheAbstractSelfTest.java | 1 + .../cache/GridCacheBasicStoreAbstractTest.java | 1 + ...acheBasicStoreMultithreadedAbstractTest.java | 1 + ...idCacheConfigurationConsistencySelfTest.java | 5 + ...idCacheGetAndTransformStoreAbstractTest.java | 1 + .../GridCacheGroupLockAbstractSelfTest.java | 1 + .../cache/GridCacheLifecycleAwareSelfTest.java | 1 + .../cache/GridCachePartitionedWritesTest.java | 1 + .../cache/GridCacheReloadSelfTest.java | 1 + .../cache/GridCacheStorePutxSelfTest.java | 1 + .../cache/GridCacheSwapReloadSelfTest.java | 2 + .../GridCacheWriteBehindStoreAbstractTest.java | 1 + ...BehindStorePartitionedMultiNodeSelfTest.java | 1 + .../IgniteTxExceptionAbstractSelfTest.java | 1 + .../IgniteTxStoreExceptionAbstractSelfTest.java | 1 + ...actQueueFailoverDataConsistencySelfTest.java | 1 + ...CacheAtomicReferenceApiSelfAbstractTest.java | 2 + .../GridCacheClientModesAbstractSelfTest.java | 2 + ...chePartitionedReloadAllAbstractSelfTest.java | 1 + .../IgniteTxPreloadAbstractTest.java | 6 +- ...heAbstractTransformWriteThroughSelfTest.java | 1 + .../dht/GridCacheColocatedDebugTest.java | 1 + .../near/GridCacheGetStoreErrorSelfTest.java | 1 + .../near/GridCacheNearOneNodeSelfTest.java | 1 + .../GridCacheNearPartitionedClearSelfTest.java | 1 + ...ePartitionedBasicStoreMultiNodeSelfTest.java | 1 + .../GridCachePartitionedLoadCacheSelfTest.java | 1 + .../GridCachePartitionedStorePutSelfTest.java | 1 + .../near/GridPartitionedBackupLoadSelfTest.java | 1 + .../GridCacheBatchEvictUnswapSelfTest.java | 1 + .../GridCacheEmptyEntriesAbstractSelfTest.java | 1 + .../GridCacheEvictionTouchSelfTest.java | 1 + .../local/GridCacheLocalLoadAllSelfTest.java | 1 + ...ridCacheContinuousQueryAbstractSelfTest.java | 1 + .../GridCacheWriteBehindStoreLoadTest.java | 1 + .../loadtests/hashmap/GridCacheTestContext.java | 8 +- .../swap/GridSwapEvictAllBenchmark.java | 1 + .../bamboo/GridDataGridTestSuite.java | 5 +- .../cache/GridCacheAbstractQuerySelfTest.java | 1 + .../cache/GridCacheQueryLoadSelfTest.java | 1 + 59 files changed, 921 insertions(+), 576 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java b/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java index 6eb6184..9cc1fc2 100644 --- a/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java @@ -266,6 +266,7 @@ public abstract class GridClientAbstractSelfTest extends GridCommonAbstractTest cfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(cacheStore)); cfg.setWriteThrough(true); cfg.setReadThrough(true); + cfg.setLoadPreviousValue(true); cfg.setSwapEnabled(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/apache/ignite/cache/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheConfiguration.java index 54c3444..5a91df8 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheConfiguration.java @@ -239,6 +239,9 @@ public class CacheConfiguration extends MutableConfiguration { /** */ private Factory storeFactory; + /** */ + private boolean loadPrevVal; + /** Node group resolver. */ private GridCacheAffinityFunction aff; @@ -383,6 +386,7 @@ public class CacheConfiguration extends MutableConfiguration { isWriteThrough = cc.isWriteThrough(); keepPortableInStore = cc.isKeepPortableInStore(); listenerConfigurations = cc.listenerConfigurations; + loadPrevVal = cc.isLoadPreviousValue(); offHeapMaxMem = cc.getOffHeapMaxMemory(); maxConcurrentAsyncOps = cc.getMaxConcurrentAsyncOperations(); maxQryIterCnt = cc.getMaximumQueryIteratorCount(); @@ -790,6 +794,20 @@ public class CacheConfiguration extends MutableConfiguration { } /** + * @return + */ + public boolean isLoadPreviousValue() { + return loadPrevVal; + } + + /** + * @param loadPrevVal + */ + public void setLoadPreviousValue(boolean loadPrevVal) { + this.loadPrevVal = loadPrevVal; + } + + /** * Gets factory for underlying persistent storage for read-through and write-through operations. * * @return Cache store factory. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java index 0b0832e..d2593ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cache.store.jdbc.*; import org.apache.ignite.lang.*; import org.apache.ignite.portables.*; +import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; @@ -125,6 +126,10 @@ public abstract class CacheStore<K, V> implements CacheLoader<K, V>, CacheWriter /** */ private CacheStoreSession ses; + /** */ + @IgniteInstanceResource + private Ignite ignite; + /** * Loads all values from underlying persistent storage. Note that keys are not * passed, so it is up to implementation to figure out what to load. This method @@ -165,4 +170,11 @@ public abstract class CacheStore<K, V> implements CacheLoader<K, V>, CacheWriter @Nullable public CacheStoreSession session() { return ses; } + + /** + * @return {@link Ignite} instance. + */ + public Ignite ignite() { + return ignite; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java index da76ecc..711d375 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java @@ -88,9 +88,6 @@ public class GridCacheAttributes implements Externalizable { /** Flag indicating whether query indexing is enabled. */ private boolean qryIdxEnabled; - /** Flag indicating whether GridGain should activate read-through/write-through behaviour by default. */ - private boolean storeEnabled; - /** Flag indicating whether GridGain should use write-behind behaviour for the cache store. */ private boolean writeBehindEnabled; @@ -154,6 +151,15 @@ public class GridCacheAttributes implements Externalizable { /** Transaction Manager lookup class name. */ private String tmLookupClsName; + /** Store read-through flag. */ + private boolean readThrough; + + /** Store write-through flag. */ + private boolean writeThrough; + + /** Store load previous value flag. */ + private boolean loadPrevVal; + /** * @param cfg Cache configuration. * @param store Cache store. @@ -167,13 +173,14 @@ public class GridCacheAttributes implements Externalizable { evictNearSync = cfg.isEvictNearSynchronized(); evictSync = cfg.isEvictSynchronized(); indexingSpiName = cfg.getIndexingSpiName(); + loadPrevVal = cfg.isLoadPreviousValue(); name = cfg.getName(); partDistro = GridCacheUtils.distributionMode(cfg); preloadBatchSize = cfg.getPreloadBatchSize(); preloadMode = cfg.getPreloadMode(); qryIdxEnabled = cfg.isQueryIndexEnabled(); + readThrough = cfg.isReadThrough(); seqReserveSize = cfg.getAtomicSequenceReserveSize(); - storeEnabled = store != null; storeValBytes = cfg.isStoreValueBytes(); swapEnabled = cfg.isSwapEnabled(); ttl = cfg.getDefaultTimeToLive(); @@ -183,6 +190,7 @@ public class GridCacheAttributes implements Externalizable { writeBehindFlushSize = cfg.getWriteBehindFlushSize(); writeBehindFlushThreadCnt = cfg.getWriteBehindFlushThreadCount(); writeSyncMode = cfg.getWriteSynchronizationMode(); + writeThrough = cfg.isWriteThrough(); affMapperClsName = className(cfg.getAffinityMapper()); @@ -451,10 +459,24 @@ public class GridCacheAttributes implements Externalizable { } /** - * @return Flag indicating whether GridGain should activate read-through/write-through behaviour by default. + * @return Flag indicating whether read-through behaviour is enabled. + */ + public boolean readThrough() { + return readThrough; + } + + /** + * @return Flag indicating whether read-through behaviour is enabled. + */ + public boolean writeThrough() { + return writeThrough; + } + + /** + * @return Flag indicating whether old value is loaded from store for cache operation. */ - public boolean storeEnabled() { - return storeEnabled; + public boolean loadPreviousValue() { + return loadPrevVal; } /** @@ -516,13 +538,14 @@ public class GridCacheAttributes implements Externalizable { out.writeBoolean(evictNearSync); out.writeBoolean(evictSync); U.writeString(out, indexingSpiName); + out.writeBoolean(loadPrevVal); U.writeString(out, name); U.writeEnum0(out, partDistro); out.writeInt(preloadBatchSize); U.writeEnum0(out, preloadMode); out.writeBoolean(qryIdxEnabled); + out.writeBoolean(readThrough); out.writeInt(seqReserveSize); - out.writeBoolean(storeEnabled); out.writeBoolean(storeValBytes); out.writeBoolean(swapEnabled); out.writeLong(ttl); @@ -532,6 +555,7 @@ public class GridCacheAttributes implements Externalizable { out.writeInt(writeBehindFlushSize); out.writeInt(writeBehindFlushThreadCnt); U.writeEnum0(out, writeSyncMode); + out.writeBoolean(writeThrough); U.writeString(out, affClsName); U.writeString(out, affMapperClsName); @@ -560,13 +584,14 @@ public class GridCacheAttributes implements Externalizable { evictNearSync = in.readBoolean(); evictSync = in.readBoolean(); indexingSpiName = U.readString(in); + loadPrevVal = in.readBoolean(); name = U.readString(in); partDistro = GridCacheDistributionMode.fromOrdinal(U.readEnumOrdinal0(in)); preloadBatchSize = in.readInt(); preloadMode = GridCachePreloadMode.fromOrdinal(U.readEnumOrdinal0(in)); qryIdxEnabled = in.readBoolean(); + readThrough = in.readBoolean(); seqReserveSize = in.readInt(); - storeEnabled = in.readBoolean(); storeValBytes = in.readBoolean(); swapEnabled = in.readBoolean(); ttl = in.readLong(); @@ -576,6 +601,7 @@ public class GridCacheAttributes implements Externalizable { writeBehindFlushSize = in.readInt(); writeBehindFlushThreadCnt = in.readInt(); writeSyncMode = GridCacheWriteSynchronizationMode.fromOrdinal(U.readEnumOrdinal0(in)); + writeThrough = in.readBoolean(); affClsName = U.readString(in); affMapperClsName = U.readString(in); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java index 7508ac4..b61b4cf 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java @@ -1406,6 +1406,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return {@code True} if store read-through mode is enabled. + */ + public boolean loadPreviousValue() { + return cacheCfg.isLoadPreviousValue(); + } + + /** * @return {@code True} if store write-through is enabled. */ public boolean writeThrough() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index d9c84ab..12e7b82 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -1426,7 +1426,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> GridCacheValueBytes oldBytes = valueBytesUnlocked(); - if (needVal && old == null && cctx.readThrough()) { + if (needVal && old == null && (cctx.readThrough() && (op == TRANSFORM || cctx.loadPreviousValue()))) { old = readThrough(null, key, false, CU.<K, V>empty(), subjId, taskName); // Detach value before index update. @@ -1766,7 +1766,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> GridCacheValueBytes oldBytes = valueBytesUnlocked(); - if (needVal && old == null && cctx.readThrough()) { + if (needVal && old == null && (cctx.readThrough() && (op == TRANSFORM || cctx.loadPreviousValue()))) { old = readThrough(null, key, false, CU.<K, V>empty(), subjId, taskName); // Detach value before index update. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java index e013216..793db4e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java @@ -598,6 +598,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { Collection<GridCacheAdapter<?, ?>> startSeq = new ArrayList<>(cfgs.length); + IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>(); + for (int i = 0; i < cfgs.length; i++) { CacheConfiguration cfg = new CacheConfiguration(cfgs[i]); @@ -632,7 +634,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheTtlManager ttlMgr = new GridCacheTtlManager(); GridCacheDrManager drMgr = ctx.createComponent(GridCacheDrManager.class); - GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, cfgStore, cfg); + GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, sesHolders, cfgStore, cfg); GridCacheContext<?, ?> cacheCtx = new GridCacheContext( ctx, @@ -1116,10 +1118,21 @@ public class GridCacheProcessor extends GridProcessorAdapter { CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionPolicy", "Eviction policy", locAttr.evictionPolicyClassName(), rmtAttr.evictionPolicyClassName(), true); - if (!skipStoreConsistencyCheck(locAttr, rmtAttr)) + if (!skipStoreConsistencyCheck(locAttr, rmtAttr)) { CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "store", "Cache store", locAttr.storeClassName(), rmtAttr.storeClassName(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "readThrough", + "Read through enabled", locAttr.readThrough(), locAttr.readThrough(), true); + + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeThrough", + "Write through enabled", locAttr.writeThrough(), locAttr.writeThrough(), true); + + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "loadPreviousValue", + "Load previous value enabled", locAttr.loadPreviousValue(), + locAttr.loadPreviousValue(), true); + } + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cloner", "Cache cloner", locAttr.clonerClassName(), rmtAttr.clonerClassName(), false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java index aeb9110..7d92f83 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java @@ -56,10 +56,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { private final CacheStoreBalancingWrapper<K, Object> singleThreadGate; /** */ - private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>(); - - /** */ - private final boolean sesEnabled; + private final ThreadLocal<SessionData> sesHolder; /** */ private final boolean locStore; @@ -69,12 +66,15 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { /** * @param ctx Kernal context. + * @param sesHolders Session holders map to use the same session holder for different managers if they use + * the same store instance. * @param cfgStore Store provided in configuration. * @param cfg Cache configuration. * @throws IgniteCheckedException In case of error. */ @SuppressWarnings("unchecked") public GridCacheStoreManager(GridKernalContext ctx, + IdentityHashMap<CacheStore, ThreadLocal> sesHolders, @Nullable CacheStore<K, Object> cfgStore, CacheConfiguration cfg) throws IgniteCheckedException { this.cfgStore = cfgStore; @@ -83,24 +83,34 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store); - if (cfgStore != null && cfg.getAtomicityMode() == TRANSACTIONAL) { + ThreadLocal<SessionData> sesHolder0 = null; + + if (cfgStore != null) { try { - Field sesField = CacheStore.class.getDeclaredField("ses"); + if (!sesHolders.containsKey(cfgStore)) { + sesHolder0 = new ThreadLocal<>(); + + Field sesField = CacheStore.class.getDeclaredField("ses"); - sesField.setAccessible(true); + sesField.setAccessible(true); - sesField.set(cfgStore, new ThreadLocalSession(sesHolder)); + sesField.set(cfgStore, new ThreadLocalSession(sesHolder0)); - sesEnabled = true; + sesHolders.put(cfgStore, sesHolder0); + } + else + sesHolder0 = sesHolders.get(cfgStore); } catch (IllegalAccessException | NoSuchFieldException e) { throw new IgniteCheckedException(e); } } - else - sesEnabled = true; + + sesHolder = sesHolder0; locStore = U.hasAnnotation(cfgStore, CacheLocalStore.class); + + assert sesHolder != null || cfgStore == null; } /** @@ -236,7 +246,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { Object val = null; - boolean ses = initSession(tx); + initSession(tx); try { val = singleThreadGate.load(key); @@ -251,8 +261,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { throw new IgniteCheckedException(new CacheLoaderException(e)); } finally { - if (ses) - sesHolder.set(null); + sesHolder.set(null); } if (log.isDebugEnabled()) @@ -374,7 +383,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Loading values from store for keys: " + keys0); - boolean ses = initSession(tx); + initSession(tx); try { CI2<K, Object> c = new CI2<K, Object>() { @@ -419,8 +428,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { throw new IgniteCheckedException(new CacheLoaderException(e)); } finally { - if (ses) - sesHolder.set(null); + sesHolder.set(null); } if (log.isDebugEnabled()) @@ -506,7 +514,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Storing value in cache store [key=" + key + ", val=" + val + ']'); - boolean ses = initSession(tx); + initSession(tx); try { store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val)); @@ -518,8 +526,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { throw new IgniteCheckedException(e); } finally { - if (ses) - sesHolder.set(null); + sesHolder.set(null); } if (log.isDebugEnabled()) @@ -575,7 +582,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { for (Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e : map.entrySet()) entries.add(new CacheEntryImpl<>(e.getKey(), locStore ? e.getValue() : e.getValue().get1())); - boolean ses = initSession(tx); + initSession(tx); try { store.writeAll(entries); @@ -596,8 +603,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { throw new IgniteCheckedException(e); } finally { - if (ses) - sesHolder.set(null); + sesHolder.set(null); } if (log.isDebugEnabled()) @@ -628,7 +634,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Removing value from cache store [key=" + key + ']'); - boolean ses = initSession(tx); + initSession(tx); try { store.delete(key); @@ -640,8 +646,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { throw new IgniteCheckedException(e); } finally { - if (ses) - sesHolder.set(null); + sesHolder.set(null); } if (log.isDebugEnabled()) @@ -677,7 +682,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Removing values from cache store [keys=" + keys0 + ']'); - boolean ses = initSession(tx); + initSession(tx); try { store.deleteAll(keys0); @@ -692,8 +697,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { throw new IgniteCheckedException(e); } finally { - if (ses) - sesHolder.set(null); + sesHolder.set(null); } if (log.isDebugEnabled()) @@ -728,17 +732,15 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { assert store != null; - boolean ses = initSession(tx); + initSession(tx); try { store.txEnd(commit); } finally { - if (ses) { - sesHolder.set(null); + sesHolder.set(null); - ((GridMetadataAware)tx).removeMeta(SES_ATTR); - } + ((GridMetadataAware)tx).removeMeta(SES_ATTR); } } @@ -760,12 +762,8 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { /** * @param tx Current transaction. - * @return {@code True} if thread local session was initialized. */ - private boolean initSession(@Nullable IgniteTx tx) { - if (!sesEnabled) - return false; - + private void initSession(@Nullable IgniteTx tx) { SessionData ses; if (tx != null) { @@ -781,8 +779,6 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { ses = new SessionData(null, cctx.name()); sesHolder.set(ses); - - return true; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 23f5ce3..186f2a0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1214,7 +1214,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts. assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll. - if (!F.isEmpty(req.filter())) { + if (!F.isEmpty(req.filter()) && ctx.loadPreviousValue()) { try { reloadIfNeeded(locked); } @@ -1416,7 +1416,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { V old = entry.innerGet( null, /*read swap*/true, - /*read through*/true, + /*read through*/ctx.loadPreviousValue(), /*fail fast*/false, /*unmarshal*/true, /*metrics*/true, @@ -1450,7 +1450,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { V old = entry.innerGet( null, /*read swap*/true, - /*read through*/true, + /*read through*/ctx.loadPreviousValue(), /*fail fast*/false, /*unmarshal*/true, /*metrics*/true, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java index 978072e..c4080d0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -1148,7 +1148,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (intercept) { V old = entry.innerGet(null, /*swap*/true, - /*read-through*/true, + /*read-through*/ctx.loadPreviousValue(), /*fail-fast*/false, /*unmarshal*/true, /**update-metrics*/true, @@ -1179,7 +1179,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (intercept) { V old = entry.innerGet(null, /*swap*/true, - /*read-through*/true, + /*read-through*/ctx.loadPreviousValue(), /*fail-fast*/false, /*unmarshal*/true, /**update-metrics*/true, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java index 123072e..11cd1af 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1942,7 +1942,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> V old = null; - boolean readThrough = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); + boolean readThrough = + cacheCtx.loadPreviousValue() && !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); if (optimistic()) { try { @@ -2275,7 +2276,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (!hasPrevVal) v = cached.innerGet(this, /*swap*/true, - /*read-through*/true, + /*read-through*/invoke || cacheCtx.loadPreviousValue(), /*failFast*/false, /*unmarshal*/true, /*metrics*/!invoke, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java index 8075172..01ae3a6 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java @@ -103,6 +103,7 @@ public class GridCacheJdbcBlobStoreMultithreadedSelfTest extends GridCommonAbstr cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store())); cc.setReadThrough(true); cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); c.setCacheConfiguration(cc); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java index bf8206c..ac51490 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java @@ -132,6 +132,7 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest { cfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); cfg.setReadThrough(true); cfg.setWriteThrough(true); + cfg.setLoadPreviousValue(true); } if (cacheMode() == PARTITIONED) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicStoreSessionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicStoreSessionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicStoreSessionTest.java new file mode 100644 index 0000000..4488e04 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicStoreSessionTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.integration; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*; +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheAtomicStoreSessionTest extends IgniteCacheStoreSessionAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() { + return PRIMARY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java index 7564f06..8bff838 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java @@ -28,8 +28,6 @@ import javax.cache.processor.*; import java.util.*; import java.util.concurrent.atomic.*; -import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; - /** * */ @@ -72,88 +70,75 @@ public abstract class IgniteCacheLoaderWriterAbstractTest extends IgniteCacheAbs storeMap.clear(); } - protected boolean putFromPrimary() { - return atomicityMode() == ATOMIC; - } - /** * @throws Exception If failed. */ public void testLoaderWriter() throws Exception { - final Object key = Integer.MAX_VALUE; - - for (int i = 0; i < gridCount(); i++) { - log.info("Test with grid: " + i); + IgniteCache<Object, Object> cache = jcache(0); - storeMap.clear(); + Object key = primaryKey(cache); - ldrCallCnt.set(0); - writerCallCnt.set(0); + assertNull(cache.get(key)); - IgniteCache<Object, Object> cache = jcache(i); + checkCalls(1, 0); - assertNull(cache.get(key)); + storeMap.put(key, "test"); - checkCalls(1, 0); + assertEquals("test", cache.get(key)); - storeMap.put(key, "test"); + checkCalls(1, 0); - assertEquals("test", cache.get(key)); + assertTrue(storeMap.containsKey(key)); - checkCalls(2, 0); + cache.remove(key); - assertTrue(storeMap.containsKey(key)); + checkCalls(0, 1); - cache.remove(key); - - checkCalls(2, 1); - - assertFalse(storeMap.containsKey(key)); + assertFalse(storeMap.containsKey(key)); - assertNull(cache.get(key)); + assertNull(cache.get(key)); - checkCalls(3, 1); + checkCalls(1, 0); - cache.put(key, "test1"); + cache.put(key, "test1"); - checkCalls(3, 2); + checkCalls(0, 1); - assertEquals("test1", storeMap.get(key)); + assertEquals("test1", storeMap.get(key)); - assertEquals("test1", cache.get(key)); + assertEquals("test1", cache.get(key)); - checkCalls(3, 2); + checkCalls(0, 0); - cache.invoke(key, new EntryProcessor<Object, Object, Object>() { - @Override public Object process(MutableEntry<Object, Object> e, Object... args) { - e.setValue("test2"); + cache.invoke(key, new EntryProcessor<Object, Object, Object>() { + @Override public Object process(MutableEntry<Object, Object> e, Object... args) { + e.setValue("test2"); - return null; - } - }); + return null; + } + }); - checkCalls(3, 3); + checkCalls(0, 1); - assertEquals("test2", storeMap.get(key)); + assertEquals("test2", storeMap.get(key)); - assertEquals("test2", cache.get(key)); + assertEquals("test2", cache.get(key)); - checkCalls(3, 3); + checkCalls(0, 0); - cache.invoke(key, new EntryProcessor<Object, Object, Object>() { - @Override public Object process(MutableEntry<Object, Object> e, Object... args) { - e.remove(); + cache.invoke(key, new EntryProcessor<Object, Object, Object>() { + @Override public Object process(MutableEntry<Object, Object> e, Object... args) { + e.remove(); - return null; - } - }); + return null; + } + }); - checkCalls(3, 4); + checkCalls(0, 1); - assertFalse(storeMap.containsKey(key)); + assertFalse(storeMap.containsKey(key)); - assertNull(cache.get(key)); - } + assertNull(cache.get(key)); } /** @@ -162,43 +147,63 @@ public abstract class IgniteCacheLoaderWriterAbstractTest extends IgniteCacheAbs public void testLoaderWriterBulk() throws Exception { Map<Object, Object> vals = new HashMap<>(); - for (int i = 0; i < 100; i++) - vals.put(i, i); + IgniteCache<Object, Object> cache = jcache(0); - for (int i = 0; i < gridCount(); i++) { - log.info("Test with grid: " + i); + for (Object key : primaryKeys(cache, 100, 0)) + vals.put(key, key); - storeMap.clear(); + assertTrue(cache.getAll(vals.keySet()).isEmpty()); - ldrCallCnt.set(0); - writerCallCnt.set(0); + checkCalls(1, 0); - IgniteCache<Object, Object> cache = jcache(i); + storeMap.putAll(vals); - assertTrue(cache.getAll(vals.keySet()).isEmpty()); + assertEquals(vals, cache.getAll(vals.keySet())); - int expLoads = gridCount(); + checkCalls(1, 0); - checkCalls(expLoads, 0); + for (Object key : vals.keySet()) + assertEquals(key, storeMap.get(key)); - storeMap.putAll(vals); + cache.removeAll(vals.keySet()); - assertEquals(vals, cache.getAll(vals.keySet())); + checkCalls(0, 1); - expLoads += gridCount(); + for (Object key : vals.keySet()) + assertFalse(storeMap.containsKey(key)); - checkCalls(expLoads, 0); + cache.putAll(vals); - for (Object key : vals.keySet()) - assertTrue(storeMap.contains(key)); + checkCalls(0, 1); - cache.removeAll(vals.keySet()); + for (Object key : vals.keySet()) + assertEquals(key, storeMap.get(key)); - checkCalls(expLoads, gridCount()); + cache.invokeAll(vals.keySet(), new EntryProcessor<Object, Object, Object>() { + @Override public Object process(MutableEntry<Object, Object> entry, Object... args) { + entry.setValue("test1"); - for (Object key : vals.keySet()) - assertFalse(storeMap.containsKey(key)); - } + return null; + } + }); + + checkCalls(0, 1); + + for (Object key : vals.keySet()) + assertEquals("test1", storeMap.get(key)); + + cache.invokeAll(vals.keySet(), new EntryProcessor<Object, Object, Object>() { + @Override public Object process(MutableEntry<Object, Object> entry, Object... args) { + entry.remove(); + + return null; + } + }); + + checkCalls(0, 1); + + for (Object key : vals.keySet()) + assertFalse(storeMap.containsKey(key)); } /** @@ -207,8 +212,10 @@ public abstract class IgniteCacheLoaderWriterAbstractTest extends IgniteCacheAbs */ private void checkCalls(int expLdr, int expWriter) { assertEquals(expLdr, ldrCallCnt.get()); - assertEquals(expWriter, writerCallCnt.get()); + + ldrCallCnt.set(0); + writerCallCnt.set(0); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java new file mode 100644 index 0000000..87de0bf --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java @@ -0,0 +1,303 @@ +package org.apache.ignite.internal.processors.cache.integration; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.util.typedef.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import javax.cache.processor.*; +import java.util.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; + +/** + * + */ +public abstract class IgniteCacheStoreSessionAbstractTest extends IgniteCacheAbstractTest { + /** */ + protected static volatile List<ExpectedData> expData; + + /** */ + protected static final String CACHE_NAME1 = "cache1"; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TestStore store = new TestStore(); // Use the same store instance for both caches. + + assert cfg.getCacheConfiguration().length == 1; + + CacheConfiguration ccfg0 = cfg.getCacheConfiguration()[0]; + + ccfg0.setReadThrough(true); + ccfg0.setWriteThrough(true); + + ccfg0.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + + CacheConfiguration ccfg1 = cacheConfiguration(gridName); + + ccfg1.setReadThrough(true); + ccfg1.setWriteThrough(true); + + ccfg1.setName(CACHE_NAME1); + + ccfg1.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + + cfg.setCacheConfiguration(ccfg0, ccfg1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + expData = null; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + expData = Collections.synchronizedList(new ArrayList<ExpectedData>()); + + super.beforeTestsStarted(); + } + + /** + * @param cache Cache. + * @param cnt Keys count. + * @return Keys. + * @throws Exception If failed. + */ + protected List<Integer> testKeys(IgniteCache cache, int cnt) throws Exception { + return primaryKeys(cache, cnt, 0); + } + + /** + * @throws Exception If failed. + */ + public void testStoreSession() throws Exception { + assertNull(jcache(0).getName()); + + assertEquals(CACHE_NAME1, ignite(0).jcache(CACHE_NAME1).getName()); + + testStoreSession(jcache(0)); + + testStoreSession(ignite(0).jcache(CACHE_NAME1)); + } + + /** + * @param cache Cache. + * @throws Exception If failed. + */ + private void testStoreSession(IgniteCache<Object, Object> cache) throws Exception { + Set<Integer> keys = new HashSet<>(primaryKeys(cache, 3, 100_000)); + + Integer key = keys.iterator().next(); + + boolean tx = atomicityMode() == TRANSACTIONAL; + + expData.add(new ExpectedData(false, "load", new HashMap<>(), cache.getName())); + + assertEquals(key, cache.get(key)); + + assertTrue(expData.isEmpty()); + + expData.add(new ExpectedData(false, "loadAll", new HashMap<>(), cache.getName())); + + assertEquals(3, cache.getAll(keys).size()); + + assertTrue(expData.isEmpty()); + + expectedData(tx, "write", cache.getName()); + + cache.put(key, key); + + assertTrue(expData.isEmpty()); + + expectedData(tx, "write", cache.getName()); + + cache.invoke(key, new EntryProcessor<Object, Object, Object>() { + @Override public Object process(MutableEntry<Object, Object> e, Object... args) { + e.setValue("val1"); + + return null; + } + }); + + assertTrue(expData.isEmpty()); + + expectedData(tx, "delete", cache.getName()); + + cache.remove(key); + + assertTrue(expData.isEmpty()); + + Map<Object, Object> vals = new HashMap<>(); + + for (Object key0 : keys) + vals.put(key0, key0); + + expectedData(tx, "writeAll", cache.getName()); + + cache.putAll(vals); + + assertTrue(expData.isEmpty()); + + expectedData(tx, "deleteAll", cache.getName()); + + cache.removeAll(keys); + + assertTrue(expData.isEmpty()); + } + + /** + * @param tx {@code True} is transaction is expected. + * @param expMtd Expected method. + * @param expCacheName Expected cache name. + */ + private void expectedData(boolean tx, String expMtd, String expCacheName) { + expData.add(new ExpectedData(tx, expMtd, new HashMap<>(), expCacheName)); + + if (tx) + expData.add(new ExpectedData(true, "txEnd", F.<Object, Object>asMap(0, expMtd), expCacheName)); + } + + /** + * + */ + static class ExpectedData { + /** */ + private final boolean tx; + + /** */ + private final String expMtd; + + /** */ + private final Map<Object, Object> expProps; + + /** */ + private final String expCacheName; + + /** + * @param tx {@code True} if transaction is enabled. + * @param expMtd Expected method. + * @param expProps Expected properties. + * @param expCacheName Expected cache name. + */ + public ExpectedData(boolean tx, String expMtd, Map<Object, Object> expProps, String expCacheName) { + this.tx = tx; + this.expMtd = expMtd; + this.expProps = expProps; + this.expCacheName = expCacheName; + } + } + + /** + * + */ + private class TestStore extends CacheStore<Object, Object> { + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) { + fail(); + } + + /** {@inheritDoc} */ + @Override public void txEnd(boolean commit) throws CacheWriterException { + log.info("Tx end [commit=" + commit + ", tx=" + session().transaction() + ']'); + + checkSession("txEnd"); + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) throws CacheLoaderException { + log.info("Load [key=" + key + ", tx=" + session().transaction() + ']'); + + checkSession("load"); + + return key; + } + + /** {@inheritDoc} */ + @Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException { + log.info("LoadAll [keys=" + keys + ", tx=" + session().transaction() + ']'); + + checkSession("loadAll"); + + Map<Object, Object> loaded = new HashMap<>(); + + for (Object key : keys) + loaded.put(key, key); + + return loaded; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException { + log.info("Write [write=" + entry + ", tx=" + session().transaction() + ']'); + + checkSession("write"); + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) throws CacheWriterException { + log.info("WriteAll: [writeAll=" + entries + ", tx=" + session().transaction() + ']'); + + checkSession("writeAll"); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + log.info("Delete [key=" + key + ", tx=" + session().transaction() + ']'); + + checkSession("delete"); + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) throws CacheWriterException { + log.info("DeleteAll [keys=" + keys + ", tx=" + session().transaction() + ']'); + + checkSession("deleteAll"); + } + + /** + * @param mtd Called stored method. + */ + private void checkSession(String mtd) { + assertNotNull(ignite()); + + assertFalse(expData.isEmpty()); + + ExpectedData exp = expData.remove(0); + + assertEquals(exp.expMtd, mtd); + + CacheStoreSession ses = session(); + + assertNotNull(ses); + + if (exp.tx) + assertNotNull(ses.transaction()); + else + assertNull(ses.transaction()); + + Map<Object, Object> props = ses.properties(); + + assertNotNull(props); + + assertEquals(exp.expProps, props); + + props.put(props.size(), mtd); + + assertEquals(exp.expCacheName, ses.cacheName()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionTest.java deleted file mode 100644 index d333456..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionTest.java +++ /dev/null @@ -1,435 +0,0 @@ -package org.apache.ignite.internal.processors.cache.integration; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.util.typedef.*; -import org.jetbrains.annotations.*; - -import javax.cache.*; -import javax.cache.integration.*; -import java.util.*; - -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; -import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; -import static org.gridgain.grid.cache.GridCacheDistributionMode.*; -import static org.gridgain.grid.cache.GridCacheMode.*; - -/** - * - */ -public class IgniteCacheStoreSessionTest extends IgniteCacheAbstractTest { - /** */ - private static volatile List<ExpectedData> expData; - - /** */ - private static final String CACHE_NAME1 = "cache1"; - - private TestStore store = new TestStore(); - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** {@inheritDoc} */ - @Override protected GridCacheMode cacheMode() { - return PARTITIONED; - } - - /** {@inheritDoc} */ - @Override protected GridCacheAtomicityMode atomicityMode() { - return TRANSACTIONAL; - } - - /** {@inheritDoc} */ - @Override protected GridCacheDistributionMode distributionMode() { - return PARTITIONED_ONLY; - } - - /** {@inheritDoc} */ - @Override protected CacheStore<?, ?> cacheStore() { - return store; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - assert cfg.getCacheConfiguration().length == 1; - - CacheConfiguration ccfg0 = cfg.getCacheConfiguration()[0]; - - CacheConfiguration ccfg1 = cacheConfiguration(gridName); - - ccfg1.setName(CACHE_NAME1); - - cfg.setCacheConfiguration(ccfg0, ccfg1); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - expData = null; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - expData = Collections.synchronizedList(new ArrayList<ExpectedData>()); - - super.beforeTestsStarted(); - } - - /** - * @throws Exception If failed. - */ - public void testStoreSession() throws Exception { - testTxPut(jcache(0), null, null); - - testTxPut(ignite(0).jcache(CACHE_NAME1), null, null); - - testTxRemove(null, null); - - testTxPutRemove(null, null); - - for (IgniteTxConcurrency concurrency : F.asList(PESSIMISTIC)) { - for (IgniteTxIsolation isolation : F.asList(REPEATABLE_READ)) { - testTxPut(jcache(0), concurrency, isolation); - - testTxRemove(concurrency, isolation); - - testTxPutRemove(concurrency, isolation); - } - } - } - - /** - * @param cnt Keys count. - * @return Keys. - * @throws Exception If failed. - */ - private List<Integer> testKeys(IgniteCache cache, int cnt) throws Exception { - return primaryKeys(cache, cnt, 0); - } - - /** - * @param concurrency Concurrency mode. - * @param isolation Isolation mode. - * @throws Exception If failed. - */ - private void testTxPutRemove(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { - log.info("Test tx put/remove [concurrency=" + concurrency + ", isolation=" + isolation + ']'); - - IgniteCache<Integer, Integer> cache = jcache(0); - - List<Integer> keys = testKeys(cache, 3); - - Integer key1 = keys.get(0); - Integer key2 = keys.get(1); - Integer key3 = keys.get(2); - - try (IgniteTx tx = startTx(concurrency, isolation)) { - log.info("Do tx put1."); - - cache.put(key1, key1); - - log.info("Do tx put2."); - - cache.put(key2, key2); - - log.info("Do tx remove."); - - cache.remove(key3); - - expData.add(new ExpectedData("writeAll", new HashMap<>(), null)); - expData.add(new ExpectedData("delete", F.<Object, Object>asMap(0, "writeAll"), null)); - expData.add(new ExpectedData("txEnd", F.<Object, Object>asMap(0, "writeAll", 1, "delete"), null)); - - log.info("Do tx commit."); - - tx.commit(); - } - - assertEquals(0, expData.size()); - } - - /** - * @param concurrency Concurrency mode. - * @param isolation Isolation mode. - * @throws Exception If failed. - */ - private void testTxPut(IgniteCache<Object, Object> cache, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation) throws Exception { - log.info("Test tx put [concurrency=" + concurrency + ", isolation=" + isolation + ']'); - - List<Integer> keys = testKeys(cache, 3); - - Integer key1 = keys.get(0); - - try (IgniteTx tx = startTx(concurrency, isolation)) { - log.info("Do tx get."); - - cache.get(key1); - - log.info("Do tx put."); - - cache.put(key1, key1); - - expData.add(new ExpectedData("write", new HashMap<>(), cache.getName())); - expData.add(new ExpectedData("txEnd", F.<Object, Object>asMap(0, "write"), cache.getName())); - - log.info("Do tx commit."); - - tx.commit(); - } - - assertEquals(0, expData.size()); - - Integer key2 = keys.get(1); - Integer key3 = keys.get(2); - - try (IgniteTx tx = startTx(concurrency, isolation);) { - log.info("Do tx put1."); - - cache.put(key2, key2); - - log.info("Do tx put2."); - - cache.put(key3, key3); - - expData.add(new ExpectedData("writeAll", new HashMap<>(), cache.getName())); - expData.add(new ExpectedData("txEnd", F.<Object, Object>asMap(0, "writeAll"), cache.getName())); - - log.info("Do tx commit."); - - tx.commit(); - } - - assertEquals(0, expData.size()); - } - - /** - * @param concurrency Concurrency mode. - * @param isolation Isolation mode. - * @throws Exception If failed. - */ - private void testTxRemove(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { - log.info("Test tx remove [concurrency=" + concurrency + ", isolation=" + isolation + ']'); - - IgniteCache<Integer, Integer> cache = jcache(0); - - List<Integer> keys = testKeys(cache, 3); - - Integer key1 = keys.get(0); - - try (IgniteTx tx = startTx(concurrency, isolation)) { - log.info("Do tx get."); - - cache.get(key1); - - log.info("Do tx remove."); - - cache.remove(key1, key1); - - expData.add(new ExpectedData("delete", new HashMap<>(), null)); - expData.add(new ExpectedData("txEnd", F.<Object, Object>asMap(0, "delete"), null)); - - log.info("Do tx commit."); - - tx.commit(); - } - - assertEquals(0, expData.size()); - - Integer key2 = keys.get(1); - Integer key3 = keys.get(2); - - try (IgniteTx tx = startTx(concurrency, isolation);) { - log.info("Do tx remove1."); - - cache.remove(key2, key2); - - log.info("Do tx remove2."); - - cache.remove(key3, key3); - - expData.add(new ExpectedData("deleteAll", new HashMap<>(), null)); - expData.add(new ExpectedData("txEnd", F.<Object, Object>asMap(0, "deleteAll"), null)); - - log.info("Do tx commit."); - - tx.commit(); - } - - assertEquals(0, expData.size()); - } - - /** - * @param concurrency Concurrency mode. - * @param isolation Isolation mode. - * @return Transaction. - */ - private IgniteTx startTx(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) { - IgniteTransactions txs = ignite(0).transactions(); - - if (concurrency == null) - return txs.txStart(); - - return txs.txStart(concurrency, isolation); - } - - /** - * @throws Exception If failed. - */ - public void testSessionCrossCacheTx() throws Exception { - IgniteCache<Object, Object> cache0 = ignite(0).jcache(null); - - IgniteCache<Object, Object> cache1 = ignite(0).jcache(CACHE_NAME1); - - Integer key1 = primaryKey(cache0); - Integer key2 = primaryKeys(cache1, 1, key1 + 1).get(0); - - try (IgniteTx tx = startTx(null, null)) { - cache1.put(key2, key2); - - cache0.put(key1, key1); - - expData.add(new ExpectedData("writeAll", new HashMap<>(), null)); - expData.add(new ExpectedData("txEnd", F.<Object, Object>asMap(0, "writeAll"), null)); - - tx.commit(); - } - - assertEquals(0, expData.size()); - } - - /** - * - */ - static class ExpectedData { - /** */ - private final String expMtd; - - /** */ - private final Map<Object, Object> expProps; - - /** */ - private final String expCacheName; - - /** - * @param expMtd Expected method. - * @param expProps Expected properties. - * @param expCacheName Expected cache name. - */ - public ExpectedData(String expMtd, Map<Object, Object> expProps, String expCacheName) { - this.expMtd = expMtd; - this.expProps = expProps; - this.expCacheName = expCacheName; - } - } - - /** - * - */ - private class TestStore extends CacheStore<Object, Object> { - /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) { - fail(); - } - - /** {@inheritDoc} */ - @Override public void txEnd(boolean commit) throws CacheWriterException { - log.info("Tx end [commit=" + commit + ", tx=" + session().transaction() + ']'); - - checkSession("txEnd"); - } - - /** {@inheritDoc} */ - @Override public Object load(Object key) throws CacheLoaderException { - log.info("Load [key=" + key + ", tx=" + session().transaction() + ']'); - - checkSession("load"); - - return key; - } - - /** {@inheritDoc} */ - @Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException { - log.info("LoadAll [keys=" + keys + ", tx=" + session().transaction() + ']'); - - checkSession("loadAll"); - - Map<Object, Object> loaded = new HashMap<>(); - - for (Object key : keys) - loaded.put(key, key); - - return loaded; - } - - /** {@inheritDoc} */ - @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException { - log.info("Write [write=" + entry + ", tx=" + session().transaction() + ']'); - - checkSession("write"); - } - - /** {@inheritDoc} */ - @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) throws CacheWriterException { - log.info("WriteAll: [writeAll=" + entries + ", tx=" + session().transaction() + ']'); - - checkSession("writeAll"); - } - - /** {@inheritDoc} */ - @Override public void delete(Object key) throws CacheWriterException { - log.info("Delete [key=" + key + ", tx=" + session().transaction() + ']'); - - checkSession("delete"); - } - - /** {@inheritDoc} */ - @Override public void deleteAll(Collection<?> keys) throws CacheWriterException { - log.info("DeleteAll [keys=" + keys + ", tx=" + session().transaction() + ']'); - - checkSession("deleteAll"); - } - - /** - * @param mtd Called stored method. - */ - private void checkSession(String mtd) { - assertFalse(expData.isEmpty()); - - ExpectedData exp = expData.remove(0); - - assertEquals(exp.expMtd, mtd); - - CacheStoreSession ses = session(); - - assertNotNull(ses); - - assertNotNull(ses.transaction()); - - Map<Object, Object> props = ses.properties(); - - assertNotNull(props); - - assertEquals(exp.expProps, props); - - props.put(props.size(), mtd); - - assertEquals(exp.expCacheName, ses.cacheName()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java new file mode 100644 index 0000000..99900cb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.integration; + +import org.apache.ignite.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.util.typedef.*; + +import java.util.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheTxStoreSessionTest extends IgniteCacheStoreSessionAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } + + /** + * @throws Exception If failed. + */ + public void testStoreSessionTx() throws Exception { + testTxPut(jcache(0), null, null); + + testTxPut(ignite(0).jcache(CACHE_NAME1), null, null); + + testTxRemove(null, null); + + testTxPutRemove(null, null); + + for (IgniteTxConcurrency concurrency : F.asList(PESSIMISTIC)) { + for (IgniteTxIsolation isolation : F.asList(REPEATABLE_READ)) { + testTxPut(jcache(0), concurrency, isolation); + + testTxRemove(concurrency, isolation); + + testTxPutRemove(concurrency, isolation); + } + } + } + + /** + * @param concurrency Concurrency mode. + * @param isolation Isolation mode. + * @throws Exception If failed. + */ + private void testTxPutRemove(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { + log.info("Test tx put/remove [concurrency=" + concurrency + ", isolation=" + isolation + ']'); + + IgniteCache<Integer, Integer> cache = jcache(0); + + List<Integer> keys = testKeys(cache, 3); + + Integer key1 = keys.get(0); + Integer key2 = keys.get(1); + Integer key3 = keys.get(2); + + try (IgniteTx tx = startTx(concurrency, isolation)) { + log.info("Do tx put1."); + + cache.put(key1, key1); + + log.info("Do tx put2."); + + cache.put(key2, key2); + + log.info("Do tx remove."); + + cache.remove(key3); + + expData.add(new ExpectedData(true, "writeAll", new HashMap<>(), null)); + expData.add(new ExpectedData(true, "delete", F.<Object, Object>asMap(0, "writeAll"), null)); + expData.add(new ExpectedData(true, "txEnd", F.<Object, Object>asMap(0, "writeAll", 1, "delete"), null)); + + log.info("Do tx commit."); + + tx.commit(); + } + + assertEquals(0, expData.size()); + } + + /** + * @param cache Cache. + * @param concurrency Concurrency mode. + * @param isolation Isolation mode. + * @throws Exception If failed. + */ + private void testTxPut(IgniteCache<Object, Object> cache, + IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation) throws Exception { + log.info("Test tx put [concurrency=" + concurrency + ", isolation=" + isolation + ']'); + + List<Integer> keys = testKeys(cache, 3); + + Integer key1 = keys.get(0); + + try (IgniteTx tx = startTx(concurrency, isolation)) { + log.info("Do tx get."); + + cache.get(key1); + + log.info("Do tx put."); + + cache.put(key1, key1); + + expData.add(new ExpectedData(true, "write", new HashMap<>(), cache.getName())); + expData.add(new ExpectedData(true, "txEnd", F.<Object, Object>asMap(0, "write"), cache.getName())); + + log.info("Do tx commit."); + + tx.commit(); + } + + assertEquals(0, expData.size()); + + Integer key2 = keys.get(1); + Integer key3 = keys.get(2); + + try (IgniteTx tx = startTx(concurrency, isolation)) { + log.info("Do tx put1."); + + cache.put(key2, key2); + + log.info("Do tx put2."); + + cache.put(key3, key3); + + expData.add(new ExpectedData(true, "writeAll", new HashMap<>(), cache.getName())); + expData.add(new ExpectedData(true, "txEnd", F.<Object, Object>asMap(0, "writeAll"), cache.getName())); + + log.info("Do tx commit."); + + tx.commit(); + } + + assertEquals(0, expData.size()); + } + + /** + * @param concurrency Concurrency mode. + * @param isolation Isolation mode. + * @throws Exception If failed. + */ + private void testTxRemove(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { + log.info("Test tx remove [concurrency=" + concurrency + ", isolation=" + isolation + ']'); + + IgniteCache<Integer, Integer> cache = jcache(0); + + List<Integer> keys = testKeys(cache, 3); + + Integer key1 = keys.get(0); + + try (IgniteTx tx = startTx(concurrency, isolation)) { + log.info("Do tx get."); + + cache.get(key1); + + log.info("Do tx remove."); + + cache.remove(key1, key1); + + expData.add(new ExpectedData(true, "delete", new HashMap<>(), null)); + expData.add(new ExpectedData(true, "txEnd", F.<Object, Object>asMap(0, "delete"), null)); + + log.info("Do tx commit."); + + tx.commit(); + } + + assertEquals(0, expData.size()); + + Integer key2 = keys.get(1); + Integer key3 = keys.get(2); + + try (IgniteTx tx = startTx(concurrency, isolation)) { + log.info("Do tx remove1."); + + cache.remove(key2, key2); + + log.info("Do tx remove2."); + + cache.remove(key3, key3); + + expData.add(new ExpectedData(true, "deleteAll", new HashMap<>(), null)); + expData.add(new ExpectedData(true, "txEnd", F.<Object, Object>asMap(0, "deleteAll"), null)); + + log.info("Do tx commit."); + + tx.commit(); + } + + assertEquals(0, expData.size()); + } + + /** + * @param concurrency Concurrency mode. + * @param isolation Isolation mode. + * @return Transaction. + */ + private IgniteTx startTx(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) { + IgniteTransactions txs = ignite(0).transactions(); + + if (concurrency == null) + return txs.txStart(); + + return txs.txStart(concurrency, isolation); + } + + /** + * @throws Exception If failed. + */ + public void testSessionCrossCacheTx() throws Exception { + IgniteCache<Object, Object> cache0 = ignite(0).jcache(null); + + IgniteCache<Object, Object> cache1 = ignite(0).jcache(CACHE_NAME1); + + Integer key1 = primaryKey(cache0); + Integer key2 = primaryKeys(cache1, 1, key1 + 1).get(0); + + try (IgniteTx tx = startTx(null, null)) { + cache0.put(key1, 1); + + cache1.put(key2, 0); + + expData.add(new ExpectedData(true, "writeAll", new HashMap<>(), null)); + expData.add(new ExpectedData(true, "txEnd", F.<Object, Object>asMap(0, "writeAll"), null)); + + tx.commit(); + } + + assertEquals(0, expData.size()); + + try (IgniteTx tx = startTx(null, null)) { + cache1.put(key1, 1); + + cache0.put(key2, 0); + + expData.add(new ExpectedData(true, "writeAll", new HashMap<>(), null)); + expData.add(new ExpectedData(true, "txEnd", F.<Object, Object>asMap(0, "writeAll"), null)); + + tx.commit(); + } + + assertEquals(0, expData.size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/gridgain/grid/cache/store/GridCacheLoadOnlyStoreAdapterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/cache/store/GridCacheLoadOnlyStoreAdapterSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/cache/store/GridCacheLoadOnlyStoreAdapterSelfTest.java index 3a761f8..931f428 100644 --- a/modules/core/src/test/java/org/gridgain/grid/cache/store/GridCacheLoadOnlyStoreAdapterSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/cache/store/GridCacheLoadOnlyStoreAdapterSelfTest.java @@ -50,6 +50,7 @@ public class GridCacheLoadOnlyStoreAdapterSelfTest extends GridCacheAbstractSelf cfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); cfg.setReadThrough(true); cfg.setWriteThrough(true); + cfg.setLoadPreviousValue(true); return cfg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractSelfTest.java index 4860f4d..e70ca7a 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractSelfTest.java @@ -239,6 +239,7 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { cfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); cfg.setReadThrough(true); cfg.setWriteThrough(true); + cfg.setLoadPreviousValue(true); } cfg.setSwapEnabled(swapEnabled()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreAbstractTest.java index 0bb03b8..3c9c1b2 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreAbstractTest.java @@ -95,6 +95,7 @@ public abstract class GridCacheBasicStoreAbstractTest extends GridCommonAbstract cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); cc.setReadThrough(true); cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); c.setCacheConfiguration(cc); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java index bd8e8ed..88d8638 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java @@ -81,6 +81,7 @@ public abstract class GridCacheBasicStoreMultithreadedAbstractTest extends GridC cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); cc.setReadThrough(true); cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); c.setCacheConfiguration(cc); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheConfigurationConsistencySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheConfigurationConsistencySelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheConfigurationConsistencySelfTest.java index d54cd41..db6f67c 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheConfigurationConsistencySelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheConfigurationConsistencySelfTest.java @@ -746,6 +746,7 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac cfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); cfg.setReadThrough(true); cfg.setWriteThrough(true); + cfg.setLoadPreviousValue(true); return null; } @@ -787,6 +788,7 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); cc.setReadThrough(true); cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); return null; } @@ -825,6 +827,7 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); cc.setReadThrough(true); cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); return null; } @@ -869,6 +872,7 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); cc.setReadThrough(true); cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); return null; } @@ -915,6 +919,7 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); cc.setReadThrough(true); cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f7984a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java index 0f2de67..2ee15b4 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java @@ -94,6 +94,7 @@ public abstract class GridCacheGetAndTransformStoreAbstractTest extends GridComm cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); cc.setReadThrough(true); cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); c.setCacheConfiguration(cc);