# GG-9973: WIP.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/29377e9e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/29377e9e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/29377e9e Branch: refs/heads/ignite-gg-9973 Commit: 29377e9e8417b1913e075460368d7ca022bec1c8 Parents: 9b23109 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Thu Apr 2 11:44:32 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Thu Apr 2 11:44:32 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheMetricsImpl.java | 1 + .../cache/CacheStorePartialUpdateException.java | 2 +- .../processors/cache/GridCacheContext.java | 7 +- .../processors/cache/GridCacheProcessor.java | 3 +- .../processors/cache/GridCacheStoreManager.java | 1202 ------------------ .../cache/GridCacheWriteBehindStore.java | 1015 --------------- .../cache/store/GridCacheOsStoreManager.java | 1202 ++++++++++++++++++ .../cache/store/GridCacheWriteBehindStore.java | 1015 +++++++++++++++ .../cache/transactions/IgniteTxAdapter.java | 5 +- .../transactions/IgniteTxLocalAdapter.java | 9 +- .../resources/META-INF/classnames.properties | 14 +- .../cache/GridCachePartitionedWritesTest.java | 3 +- ...idCacheWriteBehindStoreAbstractSelfTest.java | 189 --- .../GridCacheWriteBehindStoreAbstractTest.java | 349 ----- .../GridCacheWriteBehindStoreLocalTest.java | 30 - ...heWriteBehindStoreMultithreadedSelfTest.java | 163 --- ...BehindStorePartitionedMultiNodeSelfTest.java | 215 ---- ...ridCacheWriteBehindStorePartitionedTest.java | 30 - ...GridCacheWriteBehindStoreReplicatedTest.java | 30 - .../GridCacheWriteBehindStoreSelfTest.java | 267 ---- ...idCacheWriteBehindStoreAbstractSelfTest.java | 191 +++ .../GridCacheWriteBehindStoreAbstractTest.java | 350 +++++ .../GridCacheWriteBehindStoreLocalTest.java | 30 + ...heWriteBehindStoreMultithreadedSelfTest.java | 163 +++ ...BehindStorePartitionedMultiNodeSelfTest.java | 216 ++++ ...ridCacheWriteBehindStorePartitionedTest.java | 30 + ...GridCacheWriteBehindStoreReplicatedTest.java | 30 + .../GridCacheWriteBehindStoreSelfTest.java | 268 ++++ .../loadtests/hashmap/GridCacheTestContext.java | 3 +- .../IgniteCacheWriteBehindTestSuite.java | 3 +- 30 files changed, 3523 insertions(+), 3512 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 446070e..deebab4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStorePartialUpdateException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStorePartialUpdateException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStorePartialUpdateException.java index 6a364da..f7fca59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStorePartialUpdateException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStorePartialUpdateException.java @@ -36,7 +36,7 @@ public class CacheStorePartialUpdateException extends IgniteCheckedException { * @param cause Cause. */ @SuppressWarnings("unchecked") - CacheStorePartialUpdateException(Collection failedKeys, Exception cause) { + public CacheStorePartialUpdateException(Collection failedKeys, Exception cause) { super(cause); this.failedKeys = failedKeys; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index e7f3ad4..28ec91f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.jta.*; import org.apache.ignite.internal.processors.cache.local.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.cache.query.continuous.*; +import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.cacheobject.*; @@ -127,7 +128,7 @@ public class GridCacheContext<K, V> implements Externalizable { private GridCacheTtlManager ttlMgr; /** Store manager. */ - private GridCacheStoreManager storeMgr; + private GridCacheOsStoreManager storeMgr; /** Replication manager. */ private GridCacheDrManager drMgr; @@ -233,7 +234,7 @@ public class GridCacheContext<K, V> implements Externalizable { GridCacheEventManager evtMgr, GridCacheSwapManager swapMgr, - GridCacheStoreManager storeMgr, + GridCacheOsStoreManager storeMgr, GridCacheEvictionManager evictMgr, GridCacheQueryManager<K, V> qryMgr, CacheContinuousQueryManager contQryMgr, @@ -951,7 +952,7 @@ public class GridCacheContext<K, V> implements Externalizable { /** * @return Store manager. */ - public GridCacheStoreManager store() { + public GridCacheOsStoreManager store() { return storeMgr; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 00852fc..5761bdb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.local.*; import org.apache.ignite.internal.processors.cache.local.atomic.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.cache.query.continuous.*; +import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.plugin.*; @@ -1023,7 +1024,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheConflictResolutionManager rslvrMgr = pluginMgr.createComponent(CacheConflictResolutionManager.class); GridCacheDrManager drMgr = pluginMgr.createComponent(GridCacheDrManager.class); - GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, sesHolders, cfgStore, cfg); + GridCacheOsStoreManager storeMgr = new GridCacheOsStoreManager(ctx, sesHolders, cfgStore, cfg); GridCacheContext<?, ?> cacheCtx = new GridCacheContext( ctx, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java deleted file mode 100644 index d30aa06..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java +++ /dev/null @@ -1,1202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.store.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.lifecycle.*; -import org.apache.ignite.transactions.*; -import org.jetbrains.annotations.*; - -import javax.cache.*; -import javax.cache.integration.*; -import java.util.*; - -/** - * Store manager. - */ -@SuppressWarnings("AssignmentToCatchBlockParameter") -public class GridCacheStoreManager extends GridCacheManagerAdapter { - /** */ - private static final UUID SES_ATTR = UUID.randomUUID(); - - /** */ - private final CacheStore<Object, Object> store; - - /** */ - private final CacheStore<?, ?> cfgStore; - - /** */ - private final CacheStoreBalancingWrapper<Object, Object> singleThreadGate; - - /** */ - private final ThreadLocal<SessionData> sesHolder; - - /** */ - private final boolean locStore; - - /** */ - private final boolean writeThrough; - - /** */ - private boolean convertPortable; - - /** - * @param ctx Kernal context. - * @param sesHolders Session holders map to use the same session holder for different managers if they use - * the same store instance. - * @param cfgStore Store provided in configuration. - * @param cfg Cache configuration. - * @throws IgniteCheckedException In case of error. - */ - @SuppressWarnings("unchecked") - public GridCacheStoreManager( - GridKernalContext ctx, - Map<CacheStore, ThreadLocal> sesHolders, - @Nullable CacheStore<Object, Object> cfgStore, - CacheConfiguration cfg - ) throws IgniteCheckedException { - this.cfgStore = cfgStore; - - store = cacheStoreWrapper(ctx, cfgStore, cfg); - - singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store); - - writeThrough = cfg.isWriteThrough(); - - ThreadLocal<SessionData> sesHolder0 = null; - - if (cfgStore != null) { - sesHolder0 = sesHolders.get(cfgStore); - - if (sesHolder0 == null) { - ThreadLocalSession locSes = new ThreadLocalSession(); - - if (ctx.resource().injectStoreSession(cfgStore, locSes)) { - sesHolder0 = locSes.sesHolder; - - sesHolders.put(cfgStore, sesHolder0); - } - } - } - - sesHolder = sesHolder0; - - locStore = U.hasAnnotation(cfgStore, CacheLocalStore.class); - } - - /** - * @return {@code True} is write-through is enabled. - */ - public boolean writeThrough() { - return writeThrough; - } - - /** - * @return Unwrapped store provided in configuration. - */ - public CacheStore<?, ?> configuredStore() { - return cfgStore; - } - - /** - * Creates a wrapped cache store if write-behind cache is configured. - * - * @param ctx Kernal context. - * @param cfgStore Store provided in configuration. - * @param cfg Cache configuration. - * @return Instance if {@link GridCacheWriteBehindStore} if write-behind store is configured, - * or user-defined cache store. - */ - @SuppressWarnings({"unchecked"}) - private CacheStore cacheStoreWrapper(GridKernalContext ctx, - @Nullable CacheStore cfgStore, - CacheConfiguration cfg) { - if (cfgStore == null || !cfg.isWriteBehindEnabled()) - return cfgStore; - - GridCacheWriteBehindStore store = new GridCacheWriteBehindStore(this, - ctx.gridName(), - cfg.getName(), - ctx.log(GridCacheWriteBehindStore.class), - cfgStore); - - store.setFlushSize(cfg.getWriteBehindFlushSize()); - store.setFlushThreadCount(cfg.getWriteBehindFlushThreadCount()); - store.setFlushFrequency(cfg.getWriteBehindFlushFrequency()); - store.setBatchSize(cfg.getWriteBehindBatchSize()); - - return store; - } - - /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - if (store instanceof LifecycleAware) { - try { - // Avoid second start() call on store in case when near cache is enabled. - if (cctx.config().isWriteBehindEnabled()) { - if (!cctx.isNear()) - ((LifecycleAware)store).start(); - } - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to start cache store: " + e, e); - } - } - - convertPortable = !cctx.cacheObjects().keepPortableInStore(cctx.name()); - } - - /** {@inheritDoc} */ - @Override protected void stop0(boolean cancel) { - if (store instanceof LifecycleAware) { - try { - // Avoid second start() call on store in case when near cache is enabled. - if (cctx.config().isWriteBehindEnabled()) { - if (!cctx.isNear()) - ((LifecycleAware)store).stop(); - } - } - catch (Exception e) { - U.error(log(), "Failed to stop cache store.", e); - } - } - } - - /** - * @return Convert-portable flag. - */ - public boolean convertPortable() { - return convertPortable; - } - - /** - * @param convertPortable Convert-portable flag. - */ - public void convertPortable(boolean convertPortable) { - this.convertPortable = convertPortable; - } - - /** - * @return {@code true} If local store is configured. - */ - public boolean isLocalStore() { - return locStore; - } - - /** - * @return {@code true} If store configured. - */ - public boolean configured() { - return store != null; - } - - /** - * Loads data from persistent store. - * - * @param tx Cache transaction. - * @param key Cache key. - * @return Loaded value, possibly <tt>null</tt>. - * @throws IgniteCheckedException If data loading failed. - */ - @SuppressWarnings("unchecked") - @Nullable public Object loadFromStore(@Nullable IgniteInternalTx tx, KeyCacheObject key) - throws IgniteCheckedException { - return loadFromStore(tx, key, true); - } - - /** - * Loads data from persistent store. - * - * @param tx Cache transaction. - * @param key Cache key. - * @param convert Convert flag. - * @return Loaded value, possibly <tt>null</tt>. - * @throws IgniteCheckedException If data loading failed. - */ - @SuppressWarnings("unchecked") - @Nullable private Object loadFromStore(@Nullable IgniteInternalTx tx, - KeyCacheObject key, - boolean convert) - throws IgniteCheckedException { - if (store != null) { - if (key.internal()) - // Never load internal keys from store as they are never persisted. - return null; - - Object storeKey = key.value(cctx.cacheObjectContext(), false); - - if (convertPortable) - storeKey = cctx.unwrapPortableIfNeeded(storeKey, false); - - if (log.isDebugEnabled()) - log.debug("Loading value from store for key: " + storeKey); - - initSession(tx); - - boolean thewEx = true; - - Object val = null; - - try { - val = singleThreadGate.load(storeKey); - - thewEx = false; - } - catch (ClassCastException e) { - handleClassCastException(e); - } - catch (CacheLoaderException e) { - throw new IgniteCheckedException(e); - } - catch (Exception e) { - throw new IgniteCheckedException(new CacheLoaderException(e)); - } - finally { - endSession(tx, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Loaded value from store [key=" + key + ", val=" + val + ']'); - - if (convert) { - val = convert(val); - - return val; - } - else - return val; - } - - return null; - } - - /** - * @param val Internal value. - * @return User value. - */ - @SuppressWarnings("unchecked") - private Object convert(Object val) { - if (val == null) - return null; - - return locStore ? ((IgniteBiTuple<Object, GridCacheVersion>)val).get1() : val; - } - - /** - * @return Whether DHT transaction can write to store from DHT. - */ - public boolean writeToStoreFromDht() { - return cctx.config().isWriteBehindEnabled() || locStore; - } - - /** - * @param tx Cache transaction. - * @param keys Cache keys. - * @param vis Closure to apply for loaded elements. - * @throws IgniteCheckedException If data loading failed. - */ - public void localStoreLoadAll(@Nullable IgniteInternalTx tx, - Collection<? extends KeyCacheObject> keys, - final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis) - throws IgniteCheckedException { - assert store != null; - assert locStore; - - loadAllFromStore(tx, keys, null, vis); - } - - /** - * Loads data from persistent store. - * - * @param tx Cache transaction. - * @param keys Cache keys. - * @param vis Closure. - * @return {@code True} if there is a persistent storage. - * @throws IgniteCheckedException If data loading failed. - */ - @SuppressWarnings({"unchecked"}) - public boolean loadAllFromStore(@Nullable IgniteInternalTx tx, - Collection<? extends KeyCacheObject> keys, - final IgniteBiInClosure<KeyCacheObject, Object> vis) throws IgniteCheckedException { - if (store != null) { - loadAllFromStore(tx, keys, vis, null); - - return true; - } - else { - for (KeyCacheObject key : keys) - vis.apply(key, null); - } - - return false; - } - - /** - * @param tx Cache transaction. - * @param keys Keys to load. - * @param vis Key/value closure (only one of vis or verVis can be specified). - * @param verVis Key/value/version closure (only one of vis or verVis can be specified). - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - private void loadAllFromStore(@Nullable IgniteInternalTx tx, - Collection<? extends KeyCacheObject> keys, - @Nullable final IgniteBiInClosure<KeyCacheObject, Object> vis, - @Nullable final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> verVis) - throws IgniteCheckedException { - assert vis != null ^ verVis != null; - assert verVis == null || locStore; - - final boolean convert = verVis == null; - - if (!keys.isEmpty()) { - if (keys.size() == 1) { - KeyCacheObject key = F.first(keys); - - if (convert) - vis.apply(key, loadFromStore(tx, key)); - else { - IgniteBiTuple<Object, GridCacheVersion> t = - (IgniteBiTuple<Object, GridCacheVersion>)loadFromStore(tx, key, false); - - if (t != null) - verVis.apply(key, t.get1(), t.get2()); - } - - return; - } - - Collection<Object> keys0; - - if (convertPortable) { - keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() { - @Override public Object apply(KeyCacheObject key) { - return cctx.unwrapPortableIfNeeded(key.value(cctx.cacheObjectContext(), false), false); - } - }); - } - else { - keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() { - @Override public Object apply(KeyCacheObject key) { - return key.value(cctx.cacheObjectContext(), false); - } - }); - } - - if (log.isDebugEnabled()) - log.debug("Loading values from store for keys: " + keys0); - - initSession(tx); - - boolean thewEx = true; - - try { - IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() { - @SuppressWarnings("ConstantConditions") - @Override public void apply(Object k, Object val) { - if (convert) { - Object v = convert(val); - - vis.apply(cctx.toCacheKeyObject(k), v); - } - else { - IgniteBiTuple<Object, GridCacheVersion> v = (IgniteBiTuple<Object, GridCacheVersion>)val; - - if (v != null) - verVis.apply(cctx.toCacheKeyObject(k), v.get1(), v.get2()); - } - } - }; - - if (keys.size() > singleThreadGate.loadAllThreshold()) { - Map<Object, Object> map = store.loadAll(keys0); - - if (map != null) { - for (Map.Entry<Object, Object> e : map.entrySet()) - c.apply(cctx.toCacheKeyObject(e.getKey()), e.getValue()); - } - } - else - singleThreadGate.loadAll(keys0, c); - - thewEx = false; - } - catch (ClassCastException e) { - handleClassCastException(e); - } - catch (CacheLoaderException e) { - throw new IgniteCheckedException(e); - } - catch (Exception e) { - throw new IgniteCheckedException(new CacheLoaderException(e)); - } - finally { - endSession(tx, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Loaded values from store for keys: " + keys0); - } - } - - /** - * Loads data from persistent store. - * - * @param vis Closer to cache loaded elements. - * @param args User arguments. - * @return {@code True} if there is a persistent storage. - * @throws IgniteCheckedException If data loading failed. - */ - @SuppressWarnings({"ErrorNotRethrown", "unchecked"}) - public boolean loadCache(final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis, Object[] args) - throws IgniteCheckedException { - if (store != null) { - if (log.isDebugEnabled()) - log.debug("Loading all values from store."); - - initSession(null); - - boolean thewEx = true; - - try { - store.loadCache(new IgniteBiInClosure<Object, Object>() { - @Override public void apply(Object k, Object o) { - Object v; - GridCacheVersion ver = null; - - if (locStore) { - IgniteBiTuple<Object, GridCacheVersion> t = (IgniteBiTuple<Object, GridCacheVersion>)o; - - v = t.get1(); - ver = t.get2(); - } - else - v = o; - - KeyCacheObject cacheKey = cctx.toCacheKeyObject(k); - - vis.apply(cacheKey, v, ver); - } - }, args); - - thewEx = false; - } - catch (CacheLoaderException e) { - throw new IgniteCheckedException(e); - } - catch (Exception e) { - throw new IgniteCheckedException(new CacheLoaderException(e)); - } - finally { - endSession(null, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Loaded all values from store."); - - return true; - } - - LT.warn(log, null, "Calling Cache.loadCache() method will have no effect, " + - "CacheConfiguration.getStore() is not defined for cache: " + cctx.namexx()); - - return false; - } - - /** - * Puts key-value pair into storage. - * - * @param tx Cache transaction. - * @param key Key. - * @param val Value. - * @param ver Version. - * @return {@code true} If there is a persistent storage. - * @throws IgniteCheckedException If storage failed. - */ - @SuppressWarnings("unchecked") - public boolean putToStore(@Nullable IgniteInternalTx tx, Object key, Object val, GridCacheVersion ver) - throws IgniteCheckedException { - if (store != null) { - // Never persist internal keys. - if (key instanceof GridCacheInternal) - return true; - - if (convertPortable) { - key = cctx.unwrapPortableIfNeeded(key, false); - val = cctx.unwrapPortableIfNeeded(val, false); - } - - if (log.isDebugEnabled()) - log.debug("Storing value in cache store [key=" + key + ", val=" + val + ']'); - - initSession(tx); - - boolean thewEx = true; - - try { - store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val)); - - thewEx = false; - } - catch (ClassCastException e) { - handleClassCastException(e); - } - catch (CacheWriterException e) { - throw new IgniteCheckedException(e); - } - catch (Exception e) { - throw new IgniteCheckedException(new CacheWriterException(e)); - } - finally { - endSession(tx, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Stored value in cache store [key=" + key + ", val=" + val + ']'); - - return true; - } - - return false; - } - - /** - * Puts key-value pair into storage. - * - * @param tx Cache transaction. - * @param map Map. - * @return {@code True} if there is a persistent storage. - * @throws IgniteCheckedException If storage failed. - */ - public boolean putAllToStore(@Nullable IgniteInternalTx tx, - Map<Object, IgniteBiTuple<Object, GridCacheVersion>> map) - throws IgniteCheckedException - { - if (F.isEmpty(map)) - return true; - - if (map.size() == 1) { - Map.Entry<Object, IgniteBiTuple<Object, GridCacheVersion>> e = map.entrySet().iterator().next(); - - return putToStore(tx, e.getKey(), e.getValue().get1(), e.getValue().get2()); - } - else { - if (store != null) { - EntriesView entries = new EntriesView((Map)map); - - if (log.isDebugEnabled()) - log.debug("Storing values in cache store [entries=" + entries + ']'); - - initSession(tx); - - boolean thewEx = true; - - try { - store.writeAll(entries); - - thewEx = false; - } - catch (ClassCastException e) { - handleClassCastException(e); - } - catch (Exception e) { - if (!(e instanceof CacheWriterException)) - e = new CacheWriterException(e); - - if (!entries.isEmpty()) { - List<Object> keys = new ArrayList<>(entries.size()); - - for (Cache.Entry<?, ?> entry : entries) - keys.add(entry.getKey()); - - throw new CacheStorePartialUpdateException(keys, e); - } - - throw new IgniteCheckedException(e); - } - finally { - endSession(tx, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Stored value in cache store [entries=" + entries + ']'); - - return true; - } - - return false; - } - } - - /** - * @param tx Cache transaction. - * @param key Key. - * @return {@code True} if there is a persistent storage. - * @throws IgniteCheckedException If storage failed. - */ - @SuppressWarnings("unchecked") - public boolean removeFromStore(@Nullable IgniteInternalTx tx, Object key) throws IgniteCheckedException { - if (store != null) { - // Never remove internal key from store as it is never persisted. - if (key instanceof GridCacheInternal) - return false; - - if (convertPortable) - key = cctx.unwrapPortableIfNeeded(key, false); - - if (log.isDebugEnabled()) - log.debug("Removing value from cache store [key=" + key + ']'); - - initSession(tx); - - boolean thewEx = true; - - try { - store.delete(key); - - thewEx = false; - } - catch (ClassCastException e) { - handleClassCastException(e); - } - catch (CacheWriterException e) { - throw new IgniteCheckedException(e); - } - catch (Exception e) { - throw new IgniteCheckedException(new CacheWriterException(e)); - } - finally { - endSession(tx, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Removed value from cache store [key=" + key + ']'); - - return true; - } - - return false; - } - - /** - * @param tx Cache transaction. - * @param keys Key. - * @return {@code True} if there is a persistent storage. - * @throws IgniteCheckedException If storage failed. - */ - @SuppressWarnings("unchecked") - public boolean removeAllFromStore(@Nullable IgniteInternalTx tx, Collection<Object> keys) - throws IgniteCheckedException { - if (F.isEmpty(keys)) - return true; - - if (keys.size() == 1) { - Object key = keys.iterator().next(); - - return removeFromStore(tx, key); - } - - if (store != null) { - Collection<Object> keys0 = convertPortable ? cctx.unwrapPortablesIfNeeded(keys, false) : keys; - - if (log.isDebugEnabled()) - log.debug("Removing values from cache store [keys=" + keys0 + ']'); - - initSession(tx); - - boolean thewEx = true; - - try { - store.deleteAll(keys0); - - thewEx = false; - } - catch (ClassCastException e) { - handleClassCastException(e); - } - catch (Exception e) { - if (!(e instanceof CacheWriterException)) - e = new CacheWriterException(e); - - if (!keys0.isEmpty()) - throw new CacheStorePartialUpdateException(keys0, e); - - throw new IgniteCheckedException(e); - } - finally { - endSession(tx, thewEx); - } - - if (log.isDebugEnabled()) - log.debug("Removed values from cache store [keys=" + keys0 + ']'); - - return true; - } - - return false; - } - - /** - * @return Store. - */ - public CacheStore<Object, Object> store() { - return store; - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void forceFlush() throws IgniteCheckedException { - if (store instanceof GridCacheWriteBehindStore) - ((GridCacheWriteBehindStore)store).forceFlush(); - } - - /** - * @param tx Transaction. - * @param commit Commit. - * @throws IgniteCheckedException If failed. - */ - public void txEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException { - assert store != null; - - initSession(tx); - - try { - store.sessionEnd(commit); - } - finally { - if (sesHolder != null) { - sesHolder.set(null); - - tx.removeMeta(SES_ATTR); - } - } - } - - /** - * @param e Class cast exception. - * @throws IgniteCheckedException Thrown exception. - */ - private void handleClassCastException(ClassCastException e) throws IgniteCheckedException { - assert e != null; - - if (e.getMessage() != null) { - throw new IgniteCheckedException("Cache store must work with portable objects if portables are " + - "enabled for cache [cacheName=" + cctx.namex() + ']', e); - } - else - throw e; - } - - /** - * Clears session holder. - */ - void endSession(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException { - try { - if (tx == null) - store.sessionEnd(threwEx); - } - catch (Exception e) { - if (!threwEx) - throw U.cast(e); - } - finally { - if (sesHolder != null) - sesHolder.set(null); - } - } - - /** - * @param tx Current transaction. - */ - void initSession(@Nullable IgniteInternalTx tx) { - if (sesHolder == null) - return; - - assert sesHolder.get() == null; - - SessionData ses; - - if (tx != null) { - ses = tx.meta(SES_ATTR); - - if (ses == null) { - ses = new SessionData(tx, cctx.name()); - - tx.addMeta(SES_ATTR, ses); - } - else - // Session cache name may change in cross-cache transaction. - ses.cacheName(cctx.name()); - } - else - ses = new SessionData(null, cctx.name()); - - sesHolder.set(ses); - } - - /** - * - */ - private static class SessionData { - /** */ - @GridToStringExclude - private final IgniteInternalTx tx; - - /** */ - private String cacheName; - - /** */ - @GridToStringInclude - private Map<Object, Object> props; - - /** - * @param tx Current transaction. - * @param cacheName Cache name. - */ - private SessionData(@Nullable IgniteInternalTx tx, @Nullable String cacheName) { - this.tx = tx; - this.cacheName = cacheName; - } - - /** - * @return Transaction. - */ - @Nullable private Transaction transaction() { - return tx != null ? tx.proxy() : null; - } - - /** - * @return Properties. - */ - private Map<Object, Object> properties() { - if (props == null) - props = new GridLeanMap<>(); - - return props; - } - - /** - * @return Cache name. - */ - private String cacheName() { - return cacheName; - } - - /** - * @param cacheName Cache name. - */ - private void cacheName(String cacheName) { - this.cacheName = cacheName; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(SessionData.class, this, "tx", CU.txString(tx)); - } - } - - /** - * - */ - private static class ThreadLocalSession implements CacheStoreSession { - /** */ - private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>(); - - /** {@inheritDoc} */ - @Nullable @Override public Transaction transaction() { - SessionData ses0 = sesHolder.get(); - - return ses0 != null ? ses0.transaction() : null; - } - - /** {@inheritDoc} */ - @Override public boolean isWithinTransaction() { - return transaction() != null; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public <K1, V1> Map<K1, V1> properties() { - SessionData ses0 = sesHolder.get(); - - return ses0 != null ? (Map<K1, V1>)ses0.properties() : null; - } - - /** {@inheritDoc} */ - @Nullable @Override public String cacheName() { - SessionData ses0 = sesHolder.get(); - - return ses0 != null ? ses0.cacheName() : null; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ThreadLocalSession.class, this); - } - } - - /** - * - */ - @SuppressWarnings("unchecked") - private class EntriesView extends AbstractCollection<Cache.Entry<?, ?>> { - /** */ - private final Map<?, IgniteBiTuple<?, GridCacheVersion>> map; - - /** */ - private Set<Object> rmvd; - - /** */ - private boolean cleared; - - /** - * @param map Map. - */ - private EntriesView(Map<?, IgniteBiTuple<?, GridCacheVersion>> map) { - assert map != null; - - this.map = map; - } - - /** {@inheritDoc} */ - @Override public int size() { - return cleared ? 0 : (map.size() - (rmvd != null ? rmvd.size() : 0)); - } - - /** {@inheritDoc} */ - @Override public boolean isEmpty() { - return cleared || !iterator().hasNext(); - } - - /** {@inheritDoc} */ - @Override public boolean contains(Object o) { - if (cleared || !(o instanceof Cache.Entry)) - return false; - - Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o; - - return map.containsKey(e.getKey()); - } - - /** {@inheritDoc} */ - @NotNull @Override public Iterator<Cache.Entry<?, ?>> iterator() { - if (cleared) - return F.emptyIterator(); - - final Iterator<Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>>> it0 = (Iterator)map.entrySet().iterator(); - - return new Iterator<Cache.Entry<?, ?>>() { - /** */ - private Cache.Entry<?, ?> cur; - - /** */ - private Cache.Entry<?, ?> next; - - /** - * - */ - { - checkNext(); - } - - /** - * - */ - private void checkNext() { - while (it0.hasNext()) { - Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>> e = it0.next(); - - Object k = e.getKey(); - - if (rmvd != null && rmvd.contains(k)) - continue; - - Object v = locStore ? e.getValue() : e.getValue().get1(); - - if (convertPortable) { - k = cctx.unwrapPortableIfNeeded(k, false); - v = cctx.unwrapPortableIfNeeded(v, false); - } - - next = new CacheEntryImpl<>(k, v); - - break; - } - } - - @Override public boolean hasNext() { - return next != null; - } - - @Override public Cache.Entry<?, ?> next() { - if (next == null) - throw new NoSuchElementException(); - - cur = next; - - next = null; - - checkNext(); - - return cur; - } - - @Override public void remove() { - if (cur == null) - throw new IllegalStateException(); - - addRemoved(cur); - - cur = null; - } - }; - } - - /** {@inheritDoc} */ - @Override public boolean add(Cache.Entry<?, ?> entry) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public boolean addAll(Collection<? extends Cache.Entry<?, ?>> col) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public boolean remove(Object o) { - if (cleared || !(o instanceof Cache.Entry)) - return false; - - Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o; - - if (rmvd != null && rmvd.contains(e.getKey())) - return false; - - if (mapContains(e)) { - addRemoved(e); - - return true; - } - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean containsAll(Collection<?> col) { - if (cleared) - return false; - - for (Object o : col) { - if (contains(o)) - return false; - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean removeAll(Collection<?> col) { - if (cleared) - return false; - - boolean modified = false; - - for (Object o : col) { - if (remove(o)) - modified = true; - } - - return modified; - } - - /** {@inheritDoc} */ - @Override public boolean retainAll(Collection<?> col) { - if (cleared) - return false; - - boolean modified = false; - - for (Cache.Entry<?, ?> e : this) { - if (!col.contains(e)) { - addRemoved(e); - - modified = true; - } - } - - return modified; - } - - /** {@inheritDoc} */ - @Override public void clear() { - cleared = true; - } - - /** - * @param e Entry. - */ - private void addRemoved(Cache.Entry<?, ?> e) { - if (rmvd == null) - rmvd = new HashSet<>(); - - rmvd.add(e.getKey()); - } - - /** - * @param e Entry. - * @return {@code True} if original map contains entry. - */ - private boolean mapContains(Cache.Entry<?, ?> e) { - return map.containsKey(e.getKey()); - } - - /** {@inheritDoc} */ - public String toString() { - Iterator<Cache.Entry<?, ?>> it = iterator(); - - if (!it.hasNext()) - return "[]"; - - SB sb = new SB("["); - - while (true) { - Cache.Entry<?, ?> e = it.next(); - - sb.a(e.toString()); - - if (!it.hasNext()) - return sb.a(']').toString(); - - sb.a(", "); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java deleted file mode 100644 index c26c34e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java +++ /dev/null @@ -1,1015 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.lifecycle.*; -import org.apache.ignite.thread.*; -import org.jetbrains.annotations.*; -import org.jsr166.*; - -import javax.cache.integration.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -import static javax.cache.Cache.*; - -/** - * Internal wrapper for a {@link CacheStore} that enables write-behind logic. - * <p/> - * The general purpose of this approach is to reduce cache store load under high - * store update rate. The idea is to cache all write and remove operations in a pending - * map and delegate these changes to the underlying store either after timeout or - * if size of a pending map exceeded some pre-configured value. Another performance gain - * is achieved due to combining a group of similar operations to a single batch update. - * <p/> - * The essential flush size for the write-behind cache should be at least the estimated - * count of simultaneously written keys. In case of significantly smaller value there would - * be triggered a lot of flush events that will result in a high cache store load. - * <p/> - * Since write operations to the cache store are deferred, transaction support is lost; no - * transaction objects are passed to the underlying store. - */ -public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, LifecycleAware { - /** Default write cache initial capacity. */ - public static final int DFLT_INITIAL_CAPACITY = 1024; - - /** Overflow ratio for critical cache size calculation. */ - public static final float CACHE_OVERFLOW_RATIO = 1.5f; - - /** Default concurrency level of write cache. */ - public static final int DFLT_CONCUR_LVL = 64; - - /** Write cache initial capacity. */ - private int initCap = DFLT_INITIAL_CAPACITY; - - /** Concurrency level for write cache access. */ - private int concurLvl = DFLT_CONCUR_LVL; - - /** When cache size exceeds this value eldest entry will be stored to the underlying store. */ - private int cacheMaxSize = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_SIZE; - - /** Critical cache size. If cache size exceeds this value, data flush performed synchronously. */ - private int cacheCriticalSize; - - /** Count of worker threads performing underlying store updates. */ - private int flushThreadCnt = CacheConfiguration.DFLT_WRITE_FROM_BEHIND_FLUSH_THREAD_CNT; - - /** Cache flush frequency. All pending operations will be performed in not less then this value ms. */ - private long cacheFlushFreq = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_FREQUENCY; - - /** Maximum batch size for put and remove operations */ - private int batchSize = CacheConfiguration.DFLT_WRITE_BEHIND_BATCH_SIZE; - - /** Grid name. */ - private String gridName; - - /** Cache name. */ - private String cacheName; - - /** Underlying store. */ - private CacheStore<K, V> store; - - /** Write cache. */ - private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache; - - /** Flusher threads. */ - private GridWorker[] flushThreads; - - /** Atomic flag indicating store shutdown. */ - private AtomicBoolean stopping = new AtomicBoolean(true); - - /** Flush lock. */ - private Lock flushLock = new ReentrantLock(); - - /** Condition to determine records available for flush. */ - private Condition canFlush = flushLock.newCondition(); - - /** Variable for counting total cache overflows. */ - private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger(); - - /** Variable contains current number of overflow events. */ - private AtomicInteger cacheOverflowCntr = new AtomicInteger(); - - /** Variable for counting key-value pairs that are in {@link ValueStatus#RETRY} state. */ - private AtomicInteger retryEntriesCnt = new AtomicInteger(); - - /** Log. */ - private IgniteLogger log; - - /** Store manager. */ - private GridCacheStoreManager storeMgr; - - /** - * Creates a write-behind cache store for the given store. - * - * @param storeMgr Store manager. - * @param gridName Grid name. - * @param cacheName Cache name. - * @param log Grid logger. - * @param store {@code GridCacheStore} that need to be wrapped. - */ - public GridCacheWriteBehindStore( - GridCacheStoreManager storeMgr, - String gridName, - String cacheName, - IgniteLogger log, - CacheStore<K, V> store) { - this.storeMgr = storeMgr; - this.gridName = gridName; - this.cacheName = cacheName; - this.log = log; - this.store = store; - } - - /** - * Sets initial capacity for the write cache. - * - * @param initCap Initial capacity. - */ - public void setInitialCapacity(int initCap) { - this.initCap = initCap; - } - - /** - * Sets concurrency level for the write cache. Concurrency level is expected count of concurrent threads - * attempting to update cache. - * - * @param concurLvl Concurrency level. - */ - public void setConcurrencyLevel(int concurLvl) { - this.concurLvl = concurLvl; - } - - /** - * Sets the maximum size of the write cache. When the count of unique keys in write cache exceeds this value, - * the eldest entry in the cache is immediately scheduled for write to the underlying store. - * - * @param cacheMaxSize Max cache size. - */ - public void setFlushSize(int cacheMaxSize) { - this.cacheMaxSize = cacheMaxSize; - } - - /** - * Gets the maximum size of the write-behind buffer. When the count of unique keys - * in write buffer exceeds this value, the buffer is scheduled for write to the underlying store. - * <p/> - * If this value is {@code 0}, then flush is performed only on time-elapsing basis. However, - * when this value is {@code 0}, the cache critical size is set to - * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE} - * - * @return Buffer size that triggers flush procedure. - */ - public int getWriteBehindFlushSize() { - return cacheMaxSize; - } - - /** - * Sets the number of threads that will perform store update operations. - * - * @param flushThreadCnt Count of worker threads. - */ - public void setFlushThreadCount(int flushThreadCnt) { - this.flushThreadCnt = flushThreadCnt; - } - - /** - * Gets the number of flush threads that will perform store update operations. - * - * @return Count of worker threads. - */ - public int getWriteBehindFlushThreadCount() { - return flushThreadCnt; - } - - /** - * Sets the cache flush frequency. All pending operations on the underlying store will be performed - * within time interval not less then this value. - * - * @param cacheFlushFreq Time interval value in milliseconds. - */ - public void setFlushFrequency(long cacheFlushFreq) { - this.cacheFlushFreq = cacheFlushFreq; - } - - /** - * Gets the cache flush frequency. All pending operations on the underlying store will be performed - * within time interval not less then this value. - * <p/> - * If this value is {@code 0}, then flush is performed only when buffer size exceeds flush size. - * - * @return Flush frequency in milliseconds. - */ - public long getWriteBehindFlushFrequency() { - return cacheFlushFreq; - } - - /** - * Sets the maximum count of similar operations that can be grouped to a single batch. - * - * @param batchSize Maximum count of batch. - */ - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } - - /** - * Gets the maximum count of similar (put or remove) operations that can be grouped to a single batch. - * - * @return Maximum size of batch. - */ - public int getWriteBehindStoreBatchSize() { - return batchSize; - } - - /** - * Gets count of entries that were processed by the write-behind store and have not been - * flushed to the underlying store yet. - * - * @return Total count of entries in cache store internal buffer. - */ - public int getWriteBehindBufferSize() { - return writeCache.sizex(); - } - - /** - * @return Underlying store. - */ - public CacheStore<K, V> store() { - return store; - } - - /** - * Performs all the initialization logic for write-behind cache store. - * This class must not be used until this method returns. - */ - @Override public void start() { - assert cacheFlushFreq != 0 || cacheMaxSize != 0; - - if (stopping.compareAndSet(true, false)) { - if (log.isDebugEnabled()) - log.debug("Starting write-behind store for cache '" + cacheName + '\''); - - cacheCriticalSize = (int)(cacheMaxSize * CACHE_OVERFLOW_RATIO); - - if (cacheCriticalSize == 0) - cacheCriticalSize = CacheConfiguration.DFLT_WRITE_BEHIND_CRITICAL_SIZE; - - flushThreads = new GridWorker[flushThreadCnt]; - - writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl); - - for (int i = 0; i < flushThreads.length; i++) { - flushThreads[i] = new Flusher(gridName, "flusher-" + i, log); - - new IgniteThread(flushThreads[i]).start(); - } - } - } - - /** - * Gets count of write buffer overflow events since initialization. Each overflow event causes - * the ongoing flush operation to be performed synchronously. - * - * @return Count of cache overflow events since start. - */ - public int getWriteBehindTotalCriticalOverflowCount() { - return cacheTotalOverflowCntr.get(); - } - - /** - * Gets count of write buffer overflow events in progress at the moment. Each overflow event causes - * the ongoing flush operation to be performed synchronously. - * - * @return Count of cache overflow events since start. - */ - public int getWriteBehindCriticalOverflowCount() { - return cacheOverflowCntr.get(); - } - - /** - * Gets count of cache entries that are in a store-retry state. An entry is assigned a store-retry state - * when underlying store failed due some reason and cache has enough space to retain this entry till - * the next try. - * - * @return Count of entries in store-retry state. - */ - public int getWriteBehindErrorRetryCount() { - return retryEntriesCnt.get(); - } - - /** - * Performs shutdown logic for store. No put, get and remove requests will be processed after - * this method is called. - */ - @Override public void stop() { - if (stopping.compareAndSet(false, true)) { - if (log.isDebugEnabled()) - log.debug("Stopping write-behind store for cache '" + cacheName + '\''); - - wakeUp(); - - boolean graceful = true; - - for (GridWorker worker : flushThreads) - graceful &= U.join(worker, log); - - if (!graceful) - log.warning("Shutdown was aborted"); - } - } - - /** - * Forces all entries collected to be flushed to the underlying store. - * @throws IgniteCheckedException If failed. - */ - public void forceFlush() throws IgniteCheckedException { - wakeUp(); - } - - /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) { - store.loadCache(clo, args); - } - - /** {@inheritDoc} */ - @Override public Map<K, V> loadAll(Iterable<? extends K> keys) { - if (log.isDebugEnabled()) - log.debug("Store load all [keys=" + keys + ']'); - - Map<K, V> loaded = new HashMap<>(); - - Collection<K> remaining = new LinkedList<>(); - - for (K key : keys) { - StatefulValue<K, V> val = writeCache.get(key); - - if (val != null) { - val.readLock().lock(); - - try { - if (val.operation() == StoreOperation.PUT) - loaded.put(key, val.entry().getValue()); - else - assert val.operation() == StoreOperation.RMV : val.operation(); - } - finally { - val.readLock().unlock(); - } - } - else - remaining.add(key); - } - - // For items that were not found in queue. - if (!remaining.isEmpty()) { - Map<K, V> loaded0 = store.loadAll(remaining); - - if (loaded0 != null) - loaded.putAll(loaded0); - } - - return loaded; - } - - /** {@inheritDoc} */ - @Override public V load(K key) { - if (log.isDebugEnabled()) - log.debug("Store load [key=" + key + ']'); - - StatefulValue<K, V> val = writeCache.get(key); - - if (val != null) { - val.readLock().lock(); - - try { - switch (val.operation()) { - case PUT: - return val.entry().getValue(); - - case RMV: - return null; - - default: - assert false : "Unexpected operation: " + val.status(); - } - } - finally { - val.readLock().unlock(); - } - } - - return store.load(key); - } - - /** {@inheritDoc} */ - @Override public void writeAll(Collection<Entry<? extends K, ? extends V>> entries) { - for (Entry<? extends K, ? extends V> e : entries) - write(e); - } - - /** {@inheritDoc} */ - @Override public void write(Entry<? extends K, ? extends V> entry) { - try { - if (log.isDebugEnabled()) - log.debug("Store put [key=" + entry.getKey() + ", val=" + entry.getValue() + ']'); - - updateCache(entry.getKey(), entry, StoreOperation.PUT); - } - catch (IgniteInterruptedCheckedException e) { - throw new CacheWriterException(U.convertExceptionNoWrap(e)); - } - } - - /** {@inheritDoc} */ - @Override public void deleteAll(Collection<?> keys) { - for (Object key : keys) - delete(key); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void delete(Object key) { - try { - if (log.isDebugEnabled()) - log.debug("Store remove [key=" + key + ']'); - - updateCache((K)key, null, StoreOperation.RMV); - } - catch (IgniteInterruptedCheckedException e) { - throw new CacheWriterException(U.convertExceptionNoWrap(e)); - } - } - - /** {@inheritDoc} */ - @Override public void sessionEnd(boolean commit) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheWriteBehindStore.class, this); - } - - /** - * Performs flush-consistent cache update for the given key. - * - * @param key Key for which update is performed. - * @param val New value, may be null for remove operation. - * @param operation Updated value status - * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed. - */ - private void updateCache(K key, - @Nullable Entry<? extends K, ? extends V> val, - StoreOperation operation) - throws IgniteInterruptedCheckedException { - StatefulValue<K, V> newVal = new StatefulValue<>(val, operation); - - StatefulValue<K, V> prev; - - while ((prev = writeCache.putIfAbsent(key, newVal)) != null) { - prev.writeLock().lock(); - - try { - if (prev.status() == ValueStatus.PENDING) { - // Flush process in progress, try again. - prev.waitForFlush(); - - continue; - } - else if (prev.status() == ValueStatus.FLUSHED) - // This entry was deleted from map before we acquired the lock. - continue; - else if (prev.status() == ValueStatus.RETRY) - // New value has come, old value is no longer in RETRY state, - retryEntriesCnt.decrementAndGet(); - - assert prev.status() == ValueStatus.NEW || prev.status() == ValueStatus.RETRY; - - prev.update(val, operation, ValueStatus.NEW); - - break; - } - finally { - prev.writeLock().unlock(); - } - } - - // Now check the map size - if (writeCache.sizex() > cacheCriticalSize) - // Perform single store update in the same thread. - flushSingleValue(); - else if (cacheMaxSize > 0 && writeCache.sizex() > cacheMaxSize) - wakeUp(); - } - - /** - * Flushes one upcoming value to the underlying store. Called from - * {@link #updateCache(Object, Entry, StoreOperation)} method in case when current map size exceeds - * critical size. - */ - private void flushSingleValue() { - cacheOverflowCntr.incrementAndGet(); - - try { - Map<K, StatefulValue<K, V>> batch = null; - - for (Map.Entry<K, StatefulValue<K, V>> e : writeCache.entrySet()) { - StatefulValue<K, V> val = e.getValue(); - - val.writeLock().lock(); - - try { - ValueStatus status = val.status(); - - if (acquired(status)) - // Another thread is helping us, continue to the next entry. - continue; - - if (val.status() == ValueStatus.RETRY) - retryEntriesCnt.decrementAndGet(); - - assert retryEntriesCnt.get() >= 0; - - val.status(ValueStatus.PENDING); - - batch = Collections.singletonMap(e.getKey(), val); - } - finally { - val.writeLock().unlock(); - } - - if (!batch.isEmpty()) { - applyBatch(batch, false); - - cacheTotalOverflowCntr.incrementAndGet(); - - return; - } - } - } - finally { - cacheOverflowCntr.decrementAndGet(); - } - } - - /** - * Performs batch operation on underlying store. - * - * @param valMap Batch map. - * @param initSes {@code True} if need to initialize session. - */ - private void applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes) { - assert valMap.size() <= batchSize; - - StoreOperation operation = null; - - // Construct a map for underlying store - Map<K, Entry<? extends K, ? extends V>> batch = U.newLinkedHashMap(valMap.size()); - - for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) { - if (operation == null) - operation = e.getValue().operation(); - - assert operation == e.getValue().operation(); - - assert e.getValue().status() == ValueStatus.PENDING; - - batch.put(e.getKey(), e.getValue().entry()); - } - - if (updateStore(operation, batch, initSes)) { - for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) { - StatefulValue<K, V> val = e.getValue(); - - val.writeLock().lock(); - - try { - val.status(ValueStatus.FLUSHED); - - StatefulValue<K, V> prev = writeCache.remove(e.getKey()); - - // Additional check to ensure consistency. - assert prev == val : "Map value for key " + e.getKey() + " was updated during flush"; - - val.signalFlushed(); - } - finally { - val.writeLock().unlock(); - } - } - } - else { - // Exception occurred, we must set RETRY status - for (StatefulValue<K, V> val : valMap.values()) { - val.writeLock().lock(); - - try { - val.status(ValueStatus.RETRY); - - retryEntriesCnt.incrementAndGet(); - - val.signalFlushed(); - } - finally { - val.writeLock().unlock(); - } - } - } - } - - /** - * Tries to update store with the given values and returns {@code true} in case of success. - * - * <p/> If any exception in underlying store is occurred, this method checks the map size. - * If map size exceeds some critical value, then it returns {@code true} and this value will - * be lost. If map size does not exceed critical value, it will return false and value will - * be retained in write cache. - * - * @param operation Status indicating operation that should be performed. - * @param vals Key-Value map. - * @param initSes {@code True} if need to initialize session. - * @return {@code true} if value may be deleted from the write cache, - * {@code false} otherwise - */ - private boolean updateStore(StoreOperation operation, - Map<K, Entry<? extends K, ? extends V>> vals, - boolean initSes) { - - if (initSes && storeMgr != null) - storeMgr.initSession(null); - - try { - boolean threwEx = true; - - try { - switch (operation) { - case PUT: - store.writeAll(vals.values()); - - break; - - case RMV: - store.deleteAll(vals.keySet()); - - break; - - default: - assert false : "Unexpected operation: " + operation; - } - - threwEx = false; - - return true; - } - finally { - if (initSes && storeMgr != null) - storeMgr.endSession(null, threwEx); - } - } - catch (Exception e) { - LT.warn(log, e, "Unable to update underlying store: " + store); - - if (writeCache.sizex() > cacheCriticalSize || stopping.get()) { - for (Map.Entry<K, Entry<? extends K, ? extends V>> entry : vals.entrySet()) { - Object val = entry.getValue() != null ? entry.getValue().getValue() : null; - - log.warning("Failed to update store (value will be lost as current buffer size is greater " + - "than 'cacheCriticalSize' or node has been stopped before store was repaired) [key=" + - entry.getKey() + ", val=" + val + ", op=" + operation + "]"); - } - - return true; - } - - return false; - } - } - - /** - * Wakes up flushing threads if map size exceeded maximum value or in case of shutdown. - */ - private void wakeUp() { - flushLock.lock(); - - try { - canFlush.signalAll(); - } - finally { - flushLock.unlock(); - } - } - - /** - * Thread that performs time-based flushing of written values to the underlying storage. - */ - private class Flusher extends GridWorker { - /** {@inheritDoc */ - protected Flusher(String gridName, String name, IgniteLogger log) { - super(gridName, name, log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - while (!stopping.get() || writeCache.sizex() > 0) { - awaitOperationsAvailable(); - - flushCache(writeCache.entrySet().iterator()); - } - } - - /** - * This method awaits until enough elements in map are available or given timeout is over. - * - * @throws InterruptedException If awaiting was interrupted. - */ - private void awaitOperationsAvailable() throws InterruptedException { - flushLock.lock(); - - try { - do { - if (writeCache.sizex() <= cacheMaxSize || cacheMaxSize == 0) { - if (cacheFlushFreq > 0) - canFlush.await(cacheFlushFreq, TimeUnit.MILLISECONDS); - else - canFlush.await(); - } - } - while (writeCache.sizex() == 0 && !stopping.get()); - } - finally { - flushLock.unlock(); - } - } - - /** - * Removes values from the write cache and performs corresponding operation - * on the underlying store. - * - * @param it Iterator for write cache. - */ - private void flushCache(Iterator<Map.Entry<K,StatefulValue<K, V>>> it) { - StoreOperation operation = null; - - Map<K, StatefulValue<K, V>> batch = null; - Map<K, StatefulValue<K, V>> pending = U.newLinkedHashMap(batchSize); - - while (it.hasNext()) { - Map.Entry<K, StatefulValue<K, V>> e = it.next(); - - StatefulValue<K, V> val = e.getValue(); - - val.writeLock().lock(); - - try { - ValueStatus status = val.status(); - - if (acquired(status)) - // Another thread is helping us, continue to the next entry. - continue; - - if (status == ValueStatus.RETRY) - retryEntriesCnt.decrementAndGet(); - - assert retryEntriesCnt.get() >= 0; - - val.status(ValueStatus.PENDING); - - // We scan for the next operation and apply batch on operation change. Null means new batch. - if (operation == null) - operation = val.operation(); - - if (operation != val.operation()) { - // Operation is changed, so we need to perform a batch. - batch = pending; - pending = U.newLinkedHashMap(batchSize); - - operation = val.operation(); - - pending.put(e.getKey(), val); - } - else - pending.put(e.getKey(), val); - - if (pending.size() == batchSize) { - batch = pending; - pending = U.newLinkedHashMap(batchSize); - - operation = null; - } - } - finally { - val.writeLock().unlock(); - } - - if (batch != null && !batch.isEmpty()) { - applyBatch(batch, true); - batch = null; - } - } - - // Process the remainder. - if (!pending.isEmpty()) - applyBatch(pending, true); - } - } - - /** - * For test purposes only. - * - * @return Write cache for the underlying store operations. - */ - Map<K, StatefulValue<K, V>> writeCache() { - return writeCache; - } - - /** - * Enumeration that represents possible operations on the underlying store. - */ - private enum StoreOperation { - /** Put key-value pair to the underlying store. */ - PUT, - - /** Remove key from the underlying store. */ - RMV - } - - /** - * Enumeration that represents possible states of value in the map. - */ - private enum ValueStatus { - /** Value is scheduled for write or delete from the underlying cache but has not been captured by flusher. */ - NEW, - - /** Value is captured by flusher and store operation is performed at the moment. */ - PENDING, - - /** Store operation has failed and it will be re-tried at the next flush. */ - RETRY, - - /** Store operation succeeded and this value will be removed by flusher. */ - FLUSHED, - } - - /** - * Checks if given status indicates pending or complete flush operation. - * - * @param status Status to check. - * @return {@code true} if status indicates any pending or complete store update operation. - */ - private boolean acquired(ValueStatus status) { - return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED; - } - - /** - * A state-value-operation trio. - * - * @param <V> Value type. - */ - private static class StatefulValue<K, V> extends ReentrantReadWriteLock { - /** */ - private static final long serialVersionUID = 0L; - - /** Value. */ - @GridToStringInclude - private Entry<? extends K, ? extends V> val; - - /** Store operation. */ - private StoreOperation storeOperation; - - /** Value status. */ - private ValueStatus valStatus; - - /** Condition to wait for flush event */ - private Condition flushCond = writeLock().newCondition(); - - /** - * Creates a state-value pair with {@link ValueStatus#NEW} status. - * - * @param val Value. - * @param storeOperation Store operation. - */ - private StatefulValue(Entry<? extends K, ? extends V> val, StoreOperation storeOperation) { - assert storeOperation == StoreOperation.PUT || storeOperation == StoreOperation.RMV; - - this.val = val; - this.storeOperation = storeOperation; - valStatus = ValueStatus.NEW; - } - - /** - * @return Stored value. - */ - private Entry<? extends K, ? extends V> entry() { - return val; - } - - /** - * @return Store operation. - */ - private StoreOperation operation() { - return storeOperation; - } - - /** - * @return Value status - */ - private ValueStatus status() { - return valStatus; - } - - /** - * Updates value status. - * - * @param valStatus Value status. - */ - private void status(ValueStatus valStatus) { - this.valStatus = valStatus; - } - - /** - * Updates both value and value status. - * - * @param val Value. - * @param storeOperation Store operation. - * @param valStatus Value status. - */ - private void update(@Nullable Entry<? extends K, ? extends V> val, - StoreOperation storeOperation, - ValueStatus valStatus) { - this.val = val; - this.storeOperation = storeOperation; - this.valStatus = valStatus; - } - - /** - * Awaits a signal on flush condition - * - * @throws IgniteInterruptedCheckedException If thread was interrupted. - */ - private void waitForFlush() throws IgniteInterruptedCheckedException { - U.await(flushCond); - } - - /** - * Signals flush condition. - */ - @SuppressWarnings({"SignalWithoutCorrespondingAwait"}) - private void signalFlushed() { - flushCond.signalAll(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (!(o instanceof StatefulValue)) - return false; - - StatefulValue other = (StatefulValue)o; - - return F.eq(val, other.val) && F.eq(valStatus, other.valStatus); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = val != null ? val.hashCode() : 0; - - res = 31 * res + valStatus.hashCode(); - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StatefulValue.class, this); - } - } -}