# GG-9973: Finished.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/038f610e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/038f610e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/038f610e Branch: refs/heads/ignite-gg-9973 Commit: 038f610e866ce13a5f4f76d2361d7e6979f8c203 Parents: 29377e9 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Thu Apr 2 15:03:26 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Thu Apr 2 15:03:26 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 31 +- .../processors/cache/GridCacheContext.java | 8 +- .../processors/cache/GridCacheMapEntry.java | 28 +- .../processors/cache/GridCacheProcessor.java | 11 +- .../distributed/dht/GridDhtCacheAdapter.java | 9 +- .../distributed/dht/GridDhtCacheEntry.java | 4 +- .../distributed/dht/GridDhtLocalPartition.java | 4 +- .../distributed/dht/GridDhtLockFuture.java | 11 +- .../dht/atomic/GridDhtAtomicCache.java | 20 +- .../local/atomic/GridLocalAtomicCache.java | 7 +- .../cache/store/CacheOsStoreManager.java | 60 + .../cache/store/CacheStoreManager.java | 182 +++ .../cache/store/GridCacheOsStoreManager.java | 1202 ------------------ .../store/GridCacheStoreManagerAdapter.java | 1111 ++++++++++++++++ .../cache/store/GridCacheWriteBehindStore.java | 8 +- .../cache/transactions/IgniteTxAdapter.java | 4 +- .../transactions/IgniteTxLocalAdapter.java | 38 +- .../cacheobject/IgniteCacheObjectProcessor.java | 6 - .../IgniteCacheObjectProcessorImpl.java | 5 - .../processors/plugin/CachePluginManager.java | 8 +- .../resources/META-INF/classnames.properties | 8 +- .../loadtests/hashmap/GridCacheTestContext.java | 7 +- 22 files changed, 1461 insertions(+), 1311 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index eb1a98f..32e4572 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -993,7 +993,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @throws IgniteCheckedException In case of any errors. */ @Nullable private Object peekDb(KeyCacheObject key) throws IgniteCheckedException { - return ctx.store().loadFromStore(ctx.tm().localTxx(), key); + return ctx.store().load(ctx.tm().localTxx(), key); } /** @@ -1581,7 +1581,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return ctx.closures().callLocalSafe(new GPC<Object>() { @Nullable @Override public Object call() { try { - ctx.store().loadAllFromStore(tx, keys, vis); + ctx.store().loadAll(tx, keys, vis); } catch (IgniteCheckedException e) { throw new GridClosureException(e); @@ -2096,11 +2096,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return new GridEmbeddedFuture( ctx.closures().callLocalSafe(ctx.projectSafe(new GPC<Map<K1, V1>>() { @Override public Map<K1, V1> call() throws Exception { - ctx.store().loadAllFromStore(null/*tx*/, loadKeys.keySet(), new CI2<KeyCacheObject, Object>() { + ctx.store().loadAll(null/*tx*/, loadKeys.keySet(), new CI2<KeyCacheObject, Object>() { /** New version for all new entries. */ private GridCacheVersion nextVer; - @Override public void apply(KeyCacheObject key, Object val) { + @Override + public void apply(KeyCacheObject key, Object val) { GridCacheVersion ver = loadKeys.get(key); if (ver == null) { @@ -2146,13 +2147,11 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, ctx.evicts().touch(entry, topVer); break; - } - catch (GridCacheEntryRemovedException ignore) { + } catch (GridCacheEntryRemovedException ignore) { if (log.isDebugEnabled()) log.debug("Got removed entry during getAllAsync (will retry): " + entry); - } - catch (IgniteCheckedException e) { + } catch (IgniteCheckedException e) { // Wrap errors (will be unwrapped). throw new GridClosureException(e); } @@ -3676,7 +3675,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry(); - if (ctx.store().isLocalStore()) { + if (ctx.store().isLocal()) { DataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex()); try { @@ -3789,7 +3788,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, ExpiryPolicy plc = prj != null ? prj.expiry() : null; if (replaceExisting) { - if (ctx.store().isLocalStore()) { + if (ctx.store().isLocal()) { Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (nodes.isEmpty()) @@ -3837,8 +3836,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys); - ctx.store().loadAllFromStore(null, keys0, new CIX2<KeyCacheObject, Object>() { - @Override public void applyx(KeyCacheObject key, Object val) { + ctx.store().loadAll(null, keys0, new CIX2<KeyCacheObject, Object>() { + @Override + public void applyx(KeyCacheObject key, Object val) { col.add(new DataStreamerEntry(key, ctx.toCacheObject(val))); if (col.size() == ldr.perNodeBufferSize()) { @@ -3870,7 +3870,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys); - if (ctx.store().isLocalStore()) { + if (ctx.store().isLocal()) { DataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex()); try { @@ -3890,8 +3890,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, // Version for all loaded entries. final GridCacheVersion ver0 = ctx.versions().nextForLoad(); - ctx.store().loadAllFromStore(null, keys0, new CI2<KeyCacheObject, Object>() { - @Override public void apply(KeyCacheObject key, Object val) { + ctx.store().loadAll(null, keys0, new CI2<KeyCacheObject, Object>() { + @Override + public void apply(KeyCacheObject key, Object val) { long ttl = CU.ttlForLoad(plc0); if (ttl == CU.TTL_ZERO) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 28ec91f..dfa82e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -128,7 +128,7 @@ public class GridCacheContext<K, V> implements Externalizable { private GridCacheTtlManager ttlMgr; /** Store manager. */ - private GridCacheOsStoreManager storeMgr; + private CacheStoreManager storeMgr; /** Replication manager. */ private GridCacheDrManager drMgr; @@ -234,7 +234,7 @@ public class GridCacheContext<K, V> implements Externalizable { GridCacheEventManager evtMgr, GridCacheSwapManager swapMgr, - GridCacheOsStoreManager storeMgr, + CacheStoreManager storeMgr, GridCacheEvictionManager evictMgr, GridCacheQueryManager<K, V> qryMgr, CacheContinuousQueryManager contQryMgr, @@ -883,7 +883,7 @@ public class GridCacheContext<K, V> implements Externalizable { * are set to {@code true} or the store is local. */ public boolean writeToStoreFromDht() { - return store().isLocalStore() || cacheCfg.isWriteBehindEnabled(); + return store().isLocal() || cacheCfg.isWriteBehindEnabled(); } /** @@ -952,7 +952,7 @@ public class GridCacheContext<K, V> implements Externalizable { /** * @return Store manager. */ - public GridCacheOsStoreManager store() { + public CacheStoreManager store() { return storeMgr; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index d7fbde9..542d0ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -590,7 +590,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { @SuppressWarnings({"RedundantTypeArguments"}) @Nullable protected Object readThrough(@Nullable IgniteInternalTx tx, KeyCacheObject key, boolean reload, UUID subjId, String taskName) throws IgniteCheckedException { - return cctx.store().loadFromStore(tx, key); + return cctx.store().load(tx, key); } /** {@inheritDoc} */ @@ -1077,7 +1077,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Persist outside of synchronization. The correctness of the // value will be handled by current transaction. if (writeThrough) - cctx.store().putToStore(tx, keyValue(false), CU.value(val, cctx, false), newVer); + cctx.store().put(tx, keyValue(false), CU.value(val, cctx, false), newVer); if (intercept) cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0)); @@ -1235,7 +1235,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Persist outside of synchronization. The correctness of the // value will be handled by current transaction. if (writeThrough) - cctx.store().removeFromStore(tx, keyValue(false)); + cctx.store().remove(tx, keyValue(false)); if (!cctx.deferredDelete()) { boolean marked = false; @@ -1489,7 +1489,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (writeThrough) // Must persist inside synchronization in non-tx mode. - cctx.store().putToStore(null, keyValue(false), CU.value(updated, cctx, false), ver); + cctx.store().put(null, keyValue(false), CU.value(updated, cctx, false), ver); // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. @@ -1523,7 +1523,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { else { if (writeThrough) // Must persist inside synchronization in non-tx mode. - cctx.store().removeFromStore(null, keyValue(false)); + cctx.store().remove(null, keyValue(false)); boolean hasValPtr = valPtr != 0; @@ -1724,10 +1724,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (val == null) { assert deletedUnlocked(); - cctx.store().removeFromStore(null, keyValue(false)); + cctx.store().remove(null, keyValue(false)); } else - cctx.store().putToStore(null, keyValue(false), CU.value(val, cctx, false), ver); + cctx.store().put(null, keyValue(false), CU.value(val, cctx, false), ver); } return new GridCacheUpdateAtomicResult(false, @@ -1776,10 +1776,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (val == null) { assert deletedUnlocked(); - cctx.store().removeFromStore(null, keyValue(false)); + cctx.store().remove(null, keyValue(false)); } else - cctx.store().putToStore(null, keyValue(false), CU.value(val, cctx, false), ver); + cctx.store().put(null, keyValue(false), CU.value(val, cctx, false), ver); } else { if (log.isDebugEnabled()) @@ -2029,7 +2029,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Try write-through. if (writeThrough) // Must persist inside synchronization in non-tx mode. - cctx.store().putToStore(null, keyValue(false), CU.value(updated, cctx, false), newVer); + cctx.store().put(null, keyValue(false), CU.value(updated, cctx, false), newVer); if (!hadVal) { boolean new0 = isNew(); @@ -2099,7 +2099,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (writeThrough) // Must persist inside synchronization in non-tx mode. - cctx.store().removeFromStore(null, keyValue(false)); + cctx.store().remove(null, keyValue(false)); // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. @@ -3085,7 +3085,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { return null; } - return cctx.toCacheObject(cctx.store().loadFromStore(cctx.tm().localTxx(), key)); + return cctx.toCacheObject(cctx.store().load(cctx.tm().localTxx(), key)); } /** @@ -3225,9 +3225,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { cctx.dataStructures().onEntryUpdated(key, false); } - if (cctx.store().isLocalStore()) { + if (cctx.store().isLocal()) { if (val != null) - cctx.store().putToStore(null, keyValue(false), CU.value(val, cctx, false), ver); + cctx.store().put(null, keyValue(false), CU.value(val, cctx, false), ver); } return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 5761bdb..360bba3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1021,10 +1021,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { pluginMgr.validate(); - CacheConflictResolutionManager rslvrMgr = pluginMgr.createComponent(CacheConflictResolutionManager.class); - GridCacheDrManager drMgr = pluginMgr.createComponent(GridCacheDrManager.class); + CacheConflictResolutionManager rslvrMgr = pluginMgr.createComponent(ctx, cfg, + CacheConflictResolutionManager.class); + GridCacheDrManager drMgr = pluginMgr.createComponent(ctx, cfg,GridCacheDrManager.class); - GridCacheOsStoreManager storeMgr = new GridCacheOsStoreManager(ctx, sesHolders, cfgStore, cfg); + CacheStoreManager storeMgr = pluginMgr.createComponent(ctx, cfg, CacheStoreManager.class); + + storeMgr.initialize(cfgStore, sesHolders); GridCacheContext<?, ?> cacheCtx = new GridCacheContext( ctx, @@ -1151,8 +1154,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { swapMgr = new GridCacheSwapManager(true); evictMgr = new GridCacheEvictionManager(); evtMgr = new GridCacheEventManager(); - drMgr = pluginMgr.createComponent(GridCacheDrManager.class); pluginMgr = new CachePluginManager(ctx, cfg); + drMgr = pluginMgr.createComponent(ctx, cfg, GridCacheDrManager.class); cacheCtx = new GridCacheContext( ctx, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 6c27566..43bf256 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -360,7 +360,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** {@inheritDoc} */ @Override public void localLoad(Collection<? extends K> keys, final ExpiryPolicy plc) throws IgniteCheckedException { - if (ctx.store().isLocalStore()) { + if (ctx.store().isLocal()) { super.localLoad(keys, plc); return; @@ -377,8 +377,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys); - ctx.store().loadAllFromStore(null, keys0, new CI2<KeyCacheObject, Object>() { - @Override public void apply(KeyCacheObject key, Object val) { + ctx.store().loadAll(null, keys0, new CI2<KeyCacheObject, Object>() { + @Override + public void apply(KeyCacheObject key, Object val) { loadEntry(key, val, ver0, null, topVer, replicate, plc0); } }); @@ -386,7 +387,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** {@inheritDoc} */ @Override public void localLoadCache(final IgniteBiPredicate<K, V> p, Object[] args) throws IgniteCheckedException { - if (ctx.store().isLocalStore()) { + if (ctx.store().isLocal()) { super.localLoadCache(p, args); return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 7195d1c..e6d5173 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -564,8 +564,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { log.debug("Entry has been cleared from swap storage: " + this); } - if (cctx.store().isLocalStore()) - cctx.store().removeFromStore(null, keyValue(false)); + if (cctx.store().isLocal()) + cctx.store().remove(null, keyValue(false)); rmv = true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 4e72bd1..c433698 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -494,7 +494,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> try { GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> it = cctx.swap().iterator(id); - boolean isLocStore = cctx.store().isLocalStore(); + boolean isLocStore = cctx.store().isLocal(); if (it != null) { // We can safely remove these values because no entries will be created for evicted partition. @@ -508,7 +508,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> cctx.swap().remove(key); if (isLocStore) - cctx.store().removeFromStore(null, key.value(cctx.cacheObjectContext(), false)); + cctx.store().remove(null, key.value(cctx.cacheObjectContext(), false)); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 99fb724..24bdc88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -945,11 +945,12 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo } try { - cctx.store().loadAllFromStore( + cctx.store().loadAll( null, loadMap.keySet(), new CI2<KeyCacheObject, Object>() { - @Override public void apply(KeyCacheObject key, Object val) { + @Override + public void apply(KeyCacheObject key, Object val) { // No value loaded from store. if (val == null) return; @@ -960,12 +961,10 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo CacheObject val0 = cctx.toCacheObject(val); entry0.initialValue(val0, ver, 0, 0, false, topVer, GridDrType.DR_LOAD); - } - catch (GridCacheEntryRemovedException e) { + } catch (GridCacheEntryRemovedException e) { assert false : "Should not get removed exception while holding lock on entry " + "[entry=" + entry0 + ", e=" + e + ']'; - } - catch (IgniteCheckedException e) { + } catch (IgniteCheckedException e) { onDone(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index c376fa0..e657051 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1135,7 +1135,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (keys.size() > 1 && // Several keys ... writeThrough() && // and store is enabled ... - !ctx.store().isLocalStore() && // and this is not local store ... + !ctx.store().isLocal() && // and this is not local store ... !ctx.dr().receiveEnabled() // and no DR. ) { // This method can only be used when there are no replicated entries in the batch. @@ -1614,8 +1614,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (needReload != null) { final Map<KeyCacheObject, Integer> idxMap = needReload; - ctx.store().loadAllFromStore(null, needReload.keySet(), new CI2<KeyCacheObject, Object>() { - @Override public void apply(KeyCacheObject k, Object v) { + ctx.store().loadAll(null, needReload.keySet(), new CI2<KeyCacheObject, Object>() { + @Override + public void apply(KeyCacheObject k, Object v) { Integer idx = idxMap.get(k); if (idx != null) { @@ -1624,12 +1625,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheVersion ver = entry.version(); entry.versionedValue(ctx.toCacheObject(v), null, ver); - } - catch (GridCacheEntryRemovedException e) { + } catch (GridCacheEntryRemovedException e) { assert false : "Entry should not get obsolete while holding lock [entry=" + entry + ", e=" + e + ']'; - } - catch (IgniteCheckedException e) { + } catch (IgniteCheckedException e) { throw new IgniteException(e); } } @@ -1917,8 +1916,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { putMap; try { - ctx.store().putAllToStore(null, F.viewReadOnly(storeMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() { - @Override public IgniteBiTuple<Object, GridCacheVersion> apply(Object v) { + ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() { + @Override + public IgniteBiTuple<Object, GridCacheVersion> apply(Object v) { return F.t(v, ver); } })); @@ -1940,7 +1940,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { rmvKeys; try { - ctx.store().removeAllFromStore(null, storeKeys); + ctx.store().removeAll(null, storeKeys); } catch (CacheStorePartialUpdateException e) { storeErr = e; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index a9a6f23..d31291f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -1306,8 +1306,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { try { if (putMap != null) { try { - ctx.store().putAllToStore(null, F.viewReadOnly(putMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() { - @Override public IgniteBiTuple<Object, GridCacheVersion> apply(Object v) { + ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() { + @Override + public IgniteBiTuple<Object, GridCacheVersion> apply(Object v) { return F.t(v, ver); } })); @@ -1320,7 +1321,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { } else { try { - ctx.store().removeAllFromStore(null, rmvKeys); + ctx.store().removeAll(null, rmvKeys); } catch (CacheStorePartialUpdateException e) { storeErr = e; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java new file mode 100644 index 0000000..f59906f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; + +/** + * Default store manager implementation. + */ +public class CacheOsStoreManager extends GridCacheStoreManagerAdapter { + /** Ignite context. */ + private final GridKernalContext ctx; + + /** Cache configuration. */ + private final CacheConfiguration cfg; + + /** + * Constructor. + * + * @param ctx Ignite context. + * @param cfg Cache configuration. + * @throws IgniteCheckedException + */ + public CacheOsStoreManager(GridKernalContext ctx, CacheConfiguration cfg) { + this.ctx = ctx; + this.cfg = cfg; + } + + /** {@inheritDoc} */ + @Override protected GridKernalContext igniteContext() { + return ctx; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration() { + return cfg; + } + + /** {@inheritDoc} */ + @Override protected boolean convertPortable() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java new file mode 100644 index 0000000..d9f50ac --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Cache store manager interface. + */ +public interface CacheStoreManager<K, V> extends GridCacheManager<K, V> { + /** + * Initialize store manager. + * + * @param cfgStore Actual store. + * @param sesHolders Session holders. + * @throws org.apache.ignite.IgniteCheckedException If failed. + */ + public void initialize(@Nullable CacheStore<?, ?> cfgStore, Map<CacheStore, ThreadLocal> sesHolders) + throws IgniteCheckedException; + + /** + * @return {@code true} If store configured. + */ + public boolean configured(); + + /** + * @return Wrapped store. + */ + public CacheStore<Object, Object> store(); + + /** + * @return Unwrapped store provided in configuration. + */ + public CacheStore<?, ?> configuredStore(); + + /** + * @return {@code true} If local store is configured. + */ + public boolean isLocal(); + + /** + * @return {@code True} is write-through is enabled. + */ + public boolean isWriteThrough(); + + /** + * @return Whether DHT transaction can write to store from DHT. + */ + public boolean isWriteToStoreFromDht(); + + /** + * Loads data from persistent store. + * + * @param tx Cache transaction. + * @param key Cache key. + * @return Loaded value, possibly <tt>null</tt>. + * @throws IgniteCheckedException If data loading failed. + */ + @Nullable public Object load(@Nullable IgniteInternalTx tx, KeyCacheObject key) throws IgniteCheckedException; + + /** + * Loads data from persistent store. + * + * @param tx Cache transaction. + * @param keys Cache keys. + * @param vis Closure. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If data loading failed. + */ + public boolean loadAll(@Nullable IgniteInternalTx tx, Collection<? extends KeyCacheObject> keys, + IgniteBiInClosure<KeyCacheObject, Object> vis) throws IgniteCheckedException; + + /** + * @param tx Cache transaction. + * @param keys Cache keys. + * @param vis Closure to apply for loaded elements. + * @throws IgniteCheckedException If data loading failed. + */ + public void localStoreLoadAll(@Nullable IgniteInternalTx tx, Collection<? extends KeyCacheObject> keys, + final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis) throws IgniteCheckedException; + + /** + * Loads data from persistent store. + * + * @param vis Closer to cache loaded elements. + * @param args User arguments. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If data loading failed. + */ + public boolean loadCache(final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis, Object[] args) + throws IgniteCheckedException; + + /** + * Puts key-value pair into storage. + * + * @param tx Cache transaction. + * @param key Key. + * @param val Value. + * @param ver Version. + * @return {@code true} If there is a persistent storage. + * @throws IgniteCheckedException If storage failed. + */ + public boolean put(@Nullable IgniteInternalTx tx, Object key, Object val, GridCacheVersion ver) + throws IgniteCheckedException; + + /** + * Puts key-value pair into storage. + * + * @param tx Cache transaction. + * @param map Map. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If storage failed. + */ + public boolean putAll(@Nullable IgniteInternalTx tx, Map<Object, IgniteBiTuple<Object, GridCacheVersion>> map) + throws IgniteCheckedException; + + /** + * @param tx Cache transaction. + * @param key Key. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If storage failed. + */ + public boolean remove(@Nullable IgniteInternalTx tx, Object key) throws IgniteCheckedException; + + /** + * @param tx Cache transaction. + * @param keys Key. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If storage failed. + */ + public boolean removeAll(@Nullable IgniteInternalTx tx, Collection<Object> keys) + throws IgniteCheckedException; + + /** + * @param tx Transaction. + * @param commit Commit. + * @throws IgniteCheckedException If failed. + */ + public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException; + + /** + * End session initiated by write-behind store. + */ + public void writeBehindSessionInit(); + + /** + * End session initiated by write-behind store. + * + * @param threwEx If exception was thrown. + * @throws IgniteCheckedException If failed. + */ + public void writeBehindSessionEnd(boolean threwEx) throws IgniteCheckedException; + + /** + * @throws IgniteCheckedException If failed. + */ + public void forceFlush() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheOsStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheOsStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheOsStoreManager.java deleted file mode 100644 index 9fdbd8e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheOsStoreManager.java +++ /dev/null @@ -1,1202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.store; - -import org.apache.ignite.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.lifecycle.*; -import org.apache.ignite.transactions.*; -import org.jetbrains.annotations.*; - -import javax.cache.*; -import javax.cache.integration.*; -import java.util.*; - -/** - * Store manager. - */ -@SuppressWarnings("AssignmentToCatchBlockParameter") -public class GridCacheOsStoreManager extends GridCacheManagerAdapter { - /** */ - private static final UUID SES_ATTR = UUID.randomUUID(); - - /** */ - private final CacheStore<Object, Object> store; - - /** */ - private final CacheStore<?, ?> cfgStore; - - /** */ - private final CacheStoreBalancingWrapper<Object, Object> singleThreadGate; - - /** */ - private final ThreadLocal<SessionData> sesHolder; - - /** */ - private final boolean locStore; - - /** */ - private final boolean writeThrough; - - /** */ - private boolean convertPortable; - - /** - * @param ctx Kernal context. - * @param sesHolders Session holders map to use the same session holder for different managers if they use - * the same store instance. - * @param cfgStore Store provided in configuration. - * @param cfg Cache configuration. - * @throws IgniteCheckedException In case of error. - */ - @SuppressWarnings("unchecked") - public GridCacheOsStoreManager( - GridKernalContext ctx, - Map<CacheStore, ThreadLocal> sesHolders, - @Nullable CacheStore<Object, Object> cfgStore, - CacheConfiguration cfg - ) throws IgniteCheckedException { - this.cfgStore = cfgStore; - - store = cacheStoreWrapper(ctx, cfgStore, cfg); - - singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store); - - writeThrough = cfg.isWriteThrough(); - - ThreadLocal<SessionData> sesHolder0 = null; - - if (cfgStore != null) { - sesHolder0 = sesHolders.get(cfgStore); - - if (sesHolder0 == null) { - ThreadLocalSession locSes = new ThreadLocalSession(); - - if (ctx.resource().injectStoreSession(cfgStore, locSes)) { - sesHolder0 = locSes.sesHolder; - - sesHolders.put(cfgStore, sesHolder0); - } - } - } - - sesHolder = sesHolder0; - - locStore = U.hasAnnotation(cfgStore, CacheLocalStore.class); - } - - /** - * @return {@code True} is write-through is enabled. - */ - public boolean writeThrough() { - return writeThrough; - } - - /** - * @return Unwrapped store provided in configuration. - */ - public CacheStore<?, ?> configuredStore() { - return cfgStore; - } - - /** - * Creates a wrapped cache store if write-behind cache is configured. - * - * @param ctx Kernal context. - * @param cfgStore Store provided in configuration. - * @param cfg Cache configuration. - * @return Instance if {@link GridCacheWriteBehindStore} if write-behind store is configured, - * or user-defined cache store. - */ - @SuppressWarnings({"unchecked"}) - private CacheStore cacheStoreWrapper(GridKernalContext ctx, - @Nullable CacheStore cfgStore, - CacheConfiguration cfg) { - if (cfgStore == null || !cfg.isWriteBehindEnabled()) - return cfgStore; - - GridCacheWriteBehindStore store = new GridCacheWriteBehindStore(this, - ctx.gridName(), - cfg.getName(), - ctx.log(GridCacheWriteBehindStore.class), - cfgStore); - - store.setFlushSize(cfg.getWriteBehindFlushSize()); - store.setFlushThreadCount(cfg.getWriteBehindFlushThreadCount()); - store.setFlushFrequency(cfg.getWriteBehindFlushFrequency()); - store.setBatchSize(cfg.getWriteBehindBatchSize()); - - return store; - } - - /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - if (store instanceof LifecycleAware) { - try { - // Avoid second start() call on store in case when near cache is enabled. - if (cctx.config().isWriteBehindEnabled()) { - if (!cctx.isNear()) - ((LifecycleAware)store).start(); - } - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to start cache store: " + e, e); - } - } - - convertPortable = !cctx.cacheObjects().keepPortableInStore(cctx.name()); - } - - /** {@inheritDoc} */ - @Override protected void stop0(boolean cancel) { - if (store instanceof LifecycleAware) { - try { - // Avoid second start() call on store in case when near cache is enabled. - if (cctx.config().isWriteBehindEnabled()) { - if (!cctx.isNear()) - ((LifecycleAware)store).stop(); - } - } - catch (Exception e) { - U.error(log(), "Failed to stop cache store.", e); - } - } - } - - /** - * @return Convert-portable flag. - */ - public boolean convertPortable() { - return convertPortable; - } - - /** - * @param convertPortable Convert-portable flag. - */ - public void convertPortable(boolean convertPortable) { - this.convertPortable = convertPortable; - } - - /** - * @return {@code true} If local store is configured. - */ - public boolean isLocalStore() { - return locStore; - } - - /** - * @return {@code true} If store configured. - */ - public boolean configured() { - return store != null; - } - - /** - * Loads data from persistent store. - * - * @param tx Cache transaction. - * @param key Cache key. - * @return Loaded value, possibly <tt>null</tt>. - * @throws IgniteCheckedException If data loading failed. - */ - @SuppressWarnings("unchecked") - @Nullable public Object loadFromStore(@Nullable IgniteInternalTx tx, KeyCacheObject key) - throws IgniteCheckedException { - return loadFromStore(tx, key, true); - } - - /** - * Loads data from persistent store. - * - * @param tx Cache transaction. - * @param key Cache key. - * @param convert Convert flag. - * @return Loaded value, possibly <tt>null</tt>. - * @throws IgniteCheckedException If data loading failed. - */ - @SuppressWarnings("unchecked") - @Nullable private Object loadFromStore(@Nullable IgniteInternalTx tx, - KeyCacheObject key, - boolean convert) - throws IgniteCheckedException { - if (store != null) { - if (key.internal()) - // Never load internal keys from store as they are never persisted. - return null; - - Object storeKey = key.value(cctx.cacheObjectContext(), false); - - if (convertPortable) - storeKey = cctx.unwrapPortableIfNeeded(storeKey, false); - - if (log.isDebugEnabled()) - log.debug("Loading value from store for key: " + storeKey); - - initSession(tx); - - boolean thewEx = true; - - Object val = null; - - try { - val = singleThreadGate.load(storeKey); - - thewEx = false; - } - catch (ClassCastException e) { - handleClassCastException(e); - } - catch (CacheLoaderException e) { - throw new IgniteCheckedException(e); - } - catch (Exception e) { - throw new IgniteCheckedException(new CacheLoaderException(e)); - } - finally { - endSession(tx, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Loaded value from store [key=" + key + ", val=" + val + ']'); - - if (convert) { - val = convert(val); - - return val; - } - else - return val; - } - - return null; - } - - /** - * @param val Internal value. - * @return User value. - */ - @SuppressWarnings("unchecked") - private Object convert(Object val) { - if (val == null) - return null; - - return locStore ? ((IgniteBiTuple<Object, GridCacheVersion>)val).get1() : val; - } - - /** - * @return Whether DHT transaction can write to store from DHT. - */ - public boolean writeToStoreFromDht() { - return cctx.config().isWriteBehindEnabled() || locStore; - } - - /** - * @param tx Cache transaction. - * @param keys Cache keys. - * @param vis Closure to apply for loaded elements. - * @throws IgniteCheckedException If data loading failed. - */ - public void localStoreLoadAll(@Nullable IgniteInternalTx tx, - Collection<? extends KeyCacheObject> keys, - final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis) - throws IgniteCheckedException { - assert store != null; - assert locStore; - - loadAllFromStore(tx, keys, null, vis); - } - - /** - * Loads data from persistent store. - * - * @param tx Cache transaction. - * @param keys Cache keys. - * @param vis Closure. - * @return {@code True} if there is a persistent storage. - * @throws IgniteCheckedException If data loading failed. - */ - @SuppressWarnings({"unchecked"}) - public boolean loadAllFromStore(@Nullable IgniteInternalTx tx, - Collection<? extends KeyCacheObject> keys, - final IgniteBiInClosure<KeyCacheObject, Object> vis) throws IgniteCheckedException { - if (store != null) { - loadAllFromStore(tx, keys, vis, null); - - return true; - } - else { - for (KeyCacheObject key : keys) - vis.apply(key, null); - } - - return false; - } - - /** - * @param tx Cache transaction. - * @param keys Keys to load. - * @param vis Key/value closure (only one of vis or verVis can be specified). - * @param verVis Key/value/version closure (only one of vis or verVis can be specified). - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - private void loadAllFromStore(@Nullable IgniteInternalTx tx, - Collection<? extends KeyCacheObject> keys, - @Nullable final IgniteBiInClosure<KeyCacheObject, Object> vis, - @Nullable final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> verVis) - throws IgniteCheckedException { - assert vis != null ^ verVis != null; - assert verVis == null || locStore; - - final boolean convert = verVis == null; - - if (!keys.isEmpty()) { - if (keys.size() == 1) { - KeyCacheObject key = F.first(keys); - - if (convert) - vis.apply(key, loadFromStore(tx, key)); - else { - IgniteBiTuple<Object, GridCacheVersion> t = - (IgniteBiTuple<Object, GridCacheVersion>)loadFromStore(tx, key, false); - - if (t != null) - verVis.apply(key, t.get1(), t.get2()); - } - - return; - } - - Collection<Object> keys0; - - if (convertPortable) { - keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() { - @Override public Object apply(KeyCacheObject key) { - return cctx.unwrapPortableIfNeeded(key.value(cctx.cacheObjectContext(), false), false); - } - }); - } - else { - keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() { - @Override public Object apply(KeyCacheObject key) { - return key.value(cctx.cacheObjectContext(), false); - } - }); - } - - if (log.isDebugEnabled()) - log.debug("Loading values from store for keys: " + keys0); - - initSession(tx); - - boolean thewEx = true; - - try { - IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() { - @SuppressWarnings("ConstantConditions") - @Override public void apply(Object k, Object val) { - if (convert) { - Object v = convert(val); - - vis.apply(cctx.toCacheKeyObject(k), v); - } - else { - IgniteBiTuple<Object, GridCacheVersion> v = (IgniteBiTuple<Object, GridCacheVersion>)val; - - if (v != null) - verVis.apply(cctx.toCacheKeyObject(k), v.get1(), v.get2()); - } - } - }; - - if (keys.size() > singleThreadGate.loadAllThreshold()) { - Map<Object, Object> map = store.loadAll(keys0); - - if (map != null) { - for (Map.Entry<Object, Object> e : map.entrySet()) - c.apply(cctx.toCacheKeyObject(e.getKey()), e.getValue()); - } - } - else - singleThreadGate.loadAll(keys0, c); - - thewEx = false; - } - catch (ClassCastException e) { - handleClassCastException(e); - } - catch (CacheLoaderException e) { - throw new IgniteCheckedException(e); - } - catch (Exception e) { - throw new IgniteCheckedException(new CacheLoaderException(e)); - } - finally { - endSession(tx, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Loaded values from store for keys: " + keys0); - } - } - - /** - * Loads data from persistent store. - * - * @param vis Closer to cache loaded elements. - * @param args User arguments. - * @return {@code True} if there is a persistent storage. - * @throws IgniteCheckedException If data loading failed. - */ - @SuppressWarnings({"ErrorNotRethrown", "unchecked"}) - public boolean loadCache(final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis, Object[] args) - throws IgniteCheckedException { - if (store != null) { - if (log.isDebugEnabled()) - log.debug("Loading all values from store."); - - initSession(null); - - boolean thewEx = true; - - try { - store.loadCache(new IgniteBiInClosure<Object, Object>() { - @Override public void apply(Object k, Object o) { - Object v; - GridCacheVersion ver = null; - - if (locStore) { - IgniteBiTuple<Object, GridCacheVersion> t = (IgniteBiTuple<Object, GridCacheVersion>)o; - - v = t.get1(); - ver = t.get2(); - } - else - v = o; - - KeyCacheObject cacheKey = cctx.toCacheKeyObject(k); - - vis.apply(cacheKey, v, ver); - } - }, args); - - thewEx = false; - } - catch (CacheLoaderException e) { - throw new IgniteCheckedException(e); - } - catch (Exception e) { - throw new IgniteCheckedException(new CacheLoaderException(e)); - } - finally { - endSession(null, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Loaded all values from store."); - - return true; - } - - LT.warn(log, null, "Calling Cache.loadCache() method will have no effect, " + - "CacheConfiguration.getStore() is not defined for cache: " + cctx.namexx()); - - return false; - } - - /** - * Puts key-value pair into storage. - * - * @param tx Cache transaction. - * @param key Key. - * @param val Value. - * @param ver Version. - * @return {@code true} If there is a persistent storage. - * @throws IgniteCheckedException If storage failed. - */ - @SuppressWarnings("unchecked") - public boolean putToStore(@Nullable IgniteInternalTx tx, Object key, Object val, GridCacheVersion ver) - throws IgniteCheckedException { - if (store != null) { - // Never persist internal keys. - if (key instanceof GridCacheInternal) - return true; - - if (convertPortable) { - key = cctx.unwrapPortableIfNeeded(key, false); - val = cctx.unwrapPortableIfNeeded(val, false); - } - - if (log.isDebugEnabled()) - log.debug("Storing value in cache store [key=" + key + ", val=" + val + ']'); - - initSession(tx); - - boolean thewEx = true; - - try { - store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val)); - - thewEx = false; - } - catch (ClassCastException e) { - handleClassCastException(e); - } - catch (CacheWriterException e) { - throw new IgniteCheckedException(e); - } - catch (Exception e) { - throw new IgniteCheckedException(new CacheWriterException(e)); - } - finally { - endSession(tx, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Stored value in cache store [key=" + key + ", val=" + val + ']'); - - return true; - } - - return false; - } - - /** - * Puts key-value pair into storage. - * - * @param tx Cache transaction. - * @param map Map. - * @return {@code True} if there is a persistent storage. - * @throws IgniteCheckedException If storage failed. - */ - public boolean putAllToStore(@Nullable IgniteInternalTx tx, - Map<Object, IgniteBiTuple<Object, GridCacheVersion>> map) - throws IgniteCheckedException - { - if (F.isEmpty(map)) - return true; - - if (map.size() == 1) { - Map.Entry<Object, IgniteBiTuple<Object, GridCacheVersion>> e = map.entrySet().iterator().next(); - - return putToStore(tx, e.getKey(), e.getValue().get1(), e.getValue().get2()); - } - else { - if (store != null) { - EntriesView entries = new EntriesView((Map)map); - - if (log.isDebugEnabled()) - log.debug("Storing values in cache store [entries=" + entries + ']'); - - initSession(tx); - - boolean thewEx = true; - - try { - store.writeAll(entries); - - thewEx = false; - } - catch (ClassCastException e) { - handleClassCastException(e); - } - catch (Exception e) { - if (!(e instanceof CacheWriterException)) - e = new CacheWriterException(e); - - if (!entries.isEmpty()) { - List<Object> keys = new ArrayList<>(entries.size()); - - for (Cache.Entry<?, ?> entry : entries) - keys.add(entry.getKey()); - - throw new CacheStorePartialUpdateException(keys, e); - } - - throw new IgniteCheckedException(e); - } - finally { - endSession(tx, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Stored value in cache store [entries=" + entries + ']'); - - return true; - } - - return false; - } - } - - /** - * @param tx Cache transaction. - * @param key Key. - * @return {@code True} if there is a persistent storage. - * @throws IgniteCheckedException If storage failed. - */ - @SuppressWarnings("unchecked") - public boolean removeFromStore(@Nullable IgniteInternalTx tx, Object key) throws IgniteCheckedException { - if (store != null) { - // Never remove internal key from store as it is never persisted. - if (key instanceof GridCacheInternal) - return false; - - if (convertPortable) - key = cctx.unwrapPortableIfNeeded(key, false); - - if (log.isDebugEnabled()) - log.debug("Removing value from cache store [key=" + key + ']'); - - initSession(tx); - - boolean thewEx = true; - - try { - store.delete(key); - - thewEx = false; - } - catch (ClassCastException e) { - handleClassCastException(e); - } - catch (CacheWriterException e) { - throw new IgniteCheckedException(e); - } - catch (Exception e) { - throw new IgniteCheckedException(new CacheWriterException(e)); - } - finally { - endSession(tx, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Removed value from cache store [key=" + key + ']'); - - return true; - } - - return false; - } - - /** - * @param tx Cache transaction. - * @param keys Key. - * @return {@code True} if there is a persistent storage. - * @throws IgniteCheckedException If storage failed. - */ - @SuppressWarnings("unchecked") - public boolean removeAllFromStore(@Nullable IgniteInternalTx tx, Collection<Object> keys) - throws IgniteCheckedException { - if (F.isEmpty(keys)) - return true; - - if (keys.size() == 1) { - Object key = keys.iterator().next(); - - return removeFromStore(tx, key); - } - - if (store != null) { - Collection<Object> keys0 = convertPortable ? cctx.unwrapPortablesIfNeeded(keys, false) : keys; - - if (log.isDebugEnabled()) - log.debug("Removing values from cache store [keys=" + keys0 + ']'); - - initSession(tx); - - boolean thewEx = true; - - try { - store.deleteAll(keys0); - - thewEx = false; - } - catch (ClassCastException e) { - handleClassCastException(e); - } - catch (Exception e) { - if (!(e instanceof CacheWriterException)) - e = new CacheWriterException(e); - - if (!keys0.isEmpty()) - throw new CacheStorePartialUpdateException(keys0, e); - - throw new IgniteCheckedException(e); - } - finally { - endSession(tx, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Removed values from cache store [keys=" + keys0 + ']'); - - return true; - } - - return false; - } - - /** - * @return Store. - */ - public CacheStore<Object, Object> store() { - return store; - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void forceFlush() throws IgniteCheckedException { - if (store instanceof GridCacheWriteBehindStore) - ((GridCacheWriteBehindStore)store).forceFlush(); - } - - /** - * @param tx Transaction. - * @param commit Commit. - * @throws IgniteCheckedException If failed. - */ - public void txEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException { - assert store != null; - - initSession(tx); - - try { - store.sessionEnd(commit); - } - finally { - if (sesHolder != null) { - sesHolder.set(null); - - tx.removeMeta(SES_ATTR); - } - } - } - - /** - * @param e Class cast exception. - * @throws IgniteCheckedException Thrown exception. - */ - private void handleClassCastException(ClassCastException e) throws IgniteCheckedException { - assert e != null; - - if (e.getMessage() != null) { - throw new IgniteCheckedException("Cache store must work with portable objects if portables are " + - "enabled for cache [cacheName=" + cctx.namex() + ']', e); - } - else - throw e; - } - - /** - * Clears session holder. - */ - void endSession(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException { - try { - if (tx == null) - store.sessionEnd(threwEx); - } - catch (Exception e) { - if (!threwEx) - throw U.cast(e); - } - finally { - if (sesHolder != null) - sesHolder.set(null); - } - } - - /** - * @param tx Current transaction. - */ - void initSession(@Nullable IgniteInternalTx tx) { - if (sesHolder == null) - return; - - assert sesHolder.get() == null; - - SessionData ses; - - if (tx != null) { - ses = tx.meta(SES_ATTR); - - if (ses == null) { - ses = new SessionData(tx, cctx.name()); - - tx.addMeta(SES_ATTR, ses); - } - else - // Session cache name may change in cross-cache transaction. - ses.cacheName(cctx.name()); - } - else - ses = new SessionData(null, cctx.name()); - - sesHolder.set(ses); - } - - /** - * - */ - private static class SessionData { - /** */ - @GridToStringExclude - private final IgniteInternalTx tx; - - /** */ - private String cacheName; - - /** */ - @GridToStringInclude - private Map<Object, Object> props; - - /** - * @param tx Current transaction. - * @param cacheName Cache name. - */ - private SessionData(@Nullable IgniteInternalTx tx, @Nullable String cacheName) { - this.tx = tx; - this.cacheName = cacheName; - } - - /** - * @return Transaction. - */ - @Nullable private Transaction transaction() { - return tx != null ? tx.proxy() : null; - } - - /** - * @return Properties. - */ - private Map<Object, Object> properties() { - if (props == null) - props = new GridLeanMap<>(); - - return props; - } - - /** - * @return Cache name. - */ - private String cacheName() { - return cacheName; - } - - /** - * @param cacheName Cache name. - */ - private void cacheName(String cacheName) { - this.cacheName = cacheName; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(SessionData.class, this, "tx", CU.txString(tx)); - } - } - - /** - * - */ - private static class ThreadLocalSession implements CacheStoreSession { - /** */ - private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>(); - - /** {@inheritDoc} */ - @Nullable @Override public Transaction transaction() { - SessionData ses0 = sesHolder.get(); - - return ses0 != null ? ses0.transaction() : null; - } - - /** {@inheritDoc} */ - @Override public boolean isWithinTransaction() { - return transaction() != null; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public <K1, V1> Map<K1, V1> properties() { - SessionData ses0 = sesHolder.get(); - - return ses0 != null ? (Map<K1, V1>)ses0.properties() : null; - } - - /** {@inheritDoc} */ - @Nullable @Override public String cacheName() { - SessionData ses0 = sesHolder.get(); - - return ses0 != null ? ses0.cacheName() : null; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ThreadLocalSession.class, this); - } - } - - /** - * - */ - @SuppressWarnings("unchecked") - private class EntriesView extends AbstractCollection<Cache.Entry<?, ?>> { - /** */ - private final Map<?, IgniteBiTuple<?, GridCacheVersion>> map; - - /** */ - private Set<Object> rmvd; - - /** */ - private boolean cleared; - - /** - * @param map Map. - */ - private EntriesView(Map<?, IgniteBiTuple<?, GridCacheVersion>> map) { - assert map != null; - - this.map = map; - } - - /** {@inheritDoc} */ - @Override public int size() { - return cleared ? 0 : (map.size() - (rmvd != null ? rmvd.size() : 0)); - } - - /** {@inheritDoc} */ - @Override public boolean isEmpty() { - return cleared || !iterator().hasNext(); - } - - /** {@inheritDoc} */ - @Override public boolean contains(Object o) { - if (cleared || !(o instanceof Cache.Entry)) - return false; - - Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o; - - return map.containsKey(e.getKey()); - } - - /** {@inheritDoc} */ - @NotNull @Override public Iterator<Cache.Entry<?, ?>> iterator() { - if (cleared) - return F.emptyIterator(); - - final Iterator<Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>>> it0 = (Iterator)map.entrySet().iterator(); - - return new Iterator<Cache.Entry<?, ?>>() { - /** */ - private Cache.Entry<?, ?> cur; - - /** */ - private Cache.Entry<?, ?> next; - - /** - * - */ - { - checkNext(); - } - - /** - * - */ - private void checkNext() { - while (it0.hasNext()) { - Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>> e = it0.next(); - - Object k = e.getKey(); - - if (rmvd != null && rmvd.contains(k)) - continue; - - Object v = locStore ? e.getValue() : e.getValue().get1(); - - if (convertPortable) { - k = cctx.unwrapPortableIfNeeded(k, false); - v = cctx.unwrapPortableIfNeeded(v, false); - } - - next = new CacheEntryImpl<>(k, v); - - break; - } - } - - @Override public boolean hasNext() { - return next != null; - } - - @Override public Cache.Entry<?, ?> next() { - if (next == null) - throw new NoSuchElementException(); - - cur = next; - - next = null; - - checkNext(); - - return cur; - } - - @Override public void remove() { - if (cur == null) - throw new IllegalStateException(); - - addRemoved(cur); - - cur = null; - } - }; - } - - /** {@inheritDoc} */ - @Override public boolean add(Cache.Entry<?, ?> entry) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public boolean addAll(Collection<? extends Cache.Entry<?, ?>> col) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public boolean remove(Object o) { - if (cleared || !(o instanceof Cache.Entry)) - return false; - - Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o; - - if (rmvd != null && rmvd.contains(e.getKey())) - return false; - - if (mapContains(e)) { - addRemoved(e); - - return true; - } - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean containsAll(Collection<?> col) { - if (cleared) - return false; - - for (Object o : col) { - if (contains(o)) - return false; - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean removeAll(Collection<?> col) { - if (cleared) - return false; - - boolean modified = false; - - for (Object o : col) { - if (remove(o)) - modified = true; - } - - return modified; - } - - /** {@inheritDoc} */ - @Override public boolean retainAll(Collection<?> col) { - if (cleared) - return false; - - boolean modified = false; - - for (Cache.Entry<?, ?> e : this) { - if (!col.contains(e)) { - addRemoved(e); - - modified = true; - } - } - - return modified; - } - - /** {@inheritDoc} */ - @Override public void clear() { - cleared = true; - } - - /** - * @param e Entry. - */ - private void addRemoved(Cache.Entry<?, ?> e) { - if (rmvd == null) - rmvd = new HashSet<>(); - - rmvd.add(e.getKey()); - } - - /** - * @param e Entry. - * @return {@code True} if original map contains entry. - */ - private boolean mapContains(Cache.Entry<?, ?> e) { - return map.containsKey(e.getKey()); - } - - /** {@inheritDoc} */ - public String toString() { - Iterator<Cache.Entry<?, ?>> it = iterator(); - - if (!it.hasNext()) - return "[]"; - - SB sb = new SB("["); - - while (true) { - Cache.Entry<?, ?> e = it.next(); - - sb.a(e.toString()); - - if (!it.hasNext()) - return sb.a(']').toString(); - - sb.a(", "); - } - } - } -}