http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheOsStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheOsStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheOsStoreManager.java new file mode 100644 index 0000000..9fdbd8e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheOsStoreManager.java @@ -0,0 +1,1202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.integration.*; +import java.util.*; + +/** + * Store manager. + */ +@SuppressWarnings("AssignmentToCatchBlockParameter") +public class GridCacheOsStoreManager extends GridCacheManagerAdapter { + /** */ + private static final UUID SES_ATTR = UUID.randomUUID(); + + /** */ + private final CacheStore<Object, Object> store; + + /** */ + private final CacheStore<?, ?> cfgStore; + + /** */ + private final CacheStoreBalancingWrapper<Object, Object> singleThreadGate; + + /** */ + private final ThreadLocal<SessionData> sesHolder; + + /** */ + private final boolean locStore; + + /** */ + private final boolean writeThrough; + + /** */ + private boolean convertPortable; + + /** + * @param ctx Kernal context. + * @param sesHolders Session holders map to use the same session holder for different managers if they use + * the same store instance. + * @param cfgStore Store provided in configuration. + * @param cfg Cache configuration. + * @throws IgniteCheckedException In case of error. + */ + @SuppressWarnings("unchecked") + public GridCacheOsStoreManager( + GridKernalContext ctx, + Map<CacheStore, ThreadLocal> sesHolders, + @Nullable CacheStore<Object, Object> cfgStore, + CacheConfiguration cfg + ) throws IgniteCheckedException { + this.cfgStore = cfgStore; + + store = cacheStoreWrapper(ctx, cfgStore, cfg); + + singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store); + + writeThrough = cfg.isWriteThrough(); + + ThreadLocal<SessionData> sesHolder0 = null; + + if (cfgStore != null) { + sesHolder0 = sesHolders.get(cfgStore); + + if (sesHolder0 == null) { + ThreadLocalSession locSes = new ThreadLocalSession(); + + if (ctx.resource().injectStoreSession(cfgStore, locSes)) { + sesHolder0 = locSes.sesHolder; + + sesHolders.put(cfgStore, sesHolder0); + } + } + } + + sesHolder = sesHolder0; + + locStore = U.hasAnnotation(cfgStore, CacheLocalStore.class); + } + + /** + * @return {@code True} is write-through is enabled. + */ + public boolean writeThrough() { + return writeThrough; + } + + /** + * @return Unwrapped store provided in configuration. + */ + public CacheStore<?, ?> configuredStore() { + return cfgStore; + } + + /** + * Creates a wrapped cache store if write-behind cache is configured. + * + * @param ctx Kernal context. + * @param cfgStore Store provided in configuration. + * @param cfg Cache configuration. + * @return Instance if {@link GridCacheWriteBehindStore} if write-behind store is configured, + * or user-defined cache store. + */ + @SuppressWarnings({"unchecked"}) + private CacheStore cacheStoreWrapper(GridKernalContext ctx, + @Nullable CacheStore cfgStore, + CacheConfiguration cfg) { + if (cfgStore == null || !cfg.isWriteBehindEnabled()) + return cfgStore; + + GridCacheWriteBehindStore store = new GridCacheWriteBehindStore(this, + ctx.gridName(), + cfg.getName(), + ctx.log(GridCacheWriteBehindStore.class), + cfgStore); + + store.setFlushSize(cfg.getWriteBehindFlushSize()); + store.setFlushThreadCount(cfg.getWriteBehindFlushThreadCount()); + store.setFlushFrequency(cfg.getWriteBehindFlushFrequency()); + store.setBatchSize(cfg.getWriteBehindBatchSize()); + + return store; + } + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + if (store instanceof LifecycleAware) { + try { + // Avoid second start() call on store in case when near cache is enabled. + if (cctx.config().isWriteBehindEnabled()) { + if (!cctx.isNear()) + ((LifecycleAware)store).start(); + } + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to start cache store: " + e, e); + } + } + + convertPortable = !cctx.cacheObjects().keepPortableInStore(cctx.name()); + } + + /** {@inheritDoc} */ + @Override protected void stop0(boolean cancel) { + if (store instanceof LifecycleAware) { + try { + // Avoid second start() call on store in case when near cache is enabled. + if (cctx.config().isWriteBehindEnabled()) { + if (!cctx.isNear()) + ((LifecycleAware)store).stop(); + } + } + catch (Exception e) { + U.error(log(), "Failed to stop cache store.", e); + } + } + } + + /** + * @return Convert-portable flag. + */ + public boolean convertPortable() { + return convertPortable; + } + + /** + * @param convertPortable Convert-portable flag. + */ + public void convertPortable(boolean convertPortable) { + this.convertPortable = convertPortable; + } + + /** + * @return {@code true} If local store is configured. + */ + public boolean isLocalStore() { + return locStore; + } + + /** + * @return {@code true} If store configured. + */ + public boolean configured() { + return store != null; + } + + /** + * Loads data from persistent store. + * + * @param tx Cache transaction. + * @param key Cache key. + * @return Loaded value, possibly <tt>null</tt>. + * @throws IgniteCheckedException If data loading failed. + */ + @SuppressWarnings("unchecked") + @Nullable public Object loadFromStore(@Nullable IgniteInternalTx tx, KeyCacheObject key) + throws IgniteCheckedException { + return loadFromStore(tx, key, true); + } + + /** + * Loads data from persistent store. + * + * @param tx Cache transaction. + * @param key Cache key. + * @param convert Convert flag. + * @return Loaded value, possibly <tt>null</tt>. + * @throws IgniteCheckedException If data loading failed. + */ + @SuppressWarnings("unchecked") + @Nullable private Object loadFromStore(@Nullable IgniteInternalTx tx, + KeyCacheObject key, + boolean convert) + throws IgniteCheckedException { + if (store != null) { + if (key.internal()) + // Never load internal keys from store as they are never persisted. + return null; + + Object storeKey = key.value(cctx.cacheObjectContext(), false); + + if (convertPortable) + storeKey = cctx.unwrapPortableIfNeeded(storeKey, false); + + if (log.isDebugEnabled()) + log.debug("Loading value from store for key: " + storeKey); + + initSession(tx); + + boolean thewEx = true; + + Object val = null; + + try { + val = singleThreadGate.load(storeKey); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (CacheLoaderException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheLoaderException(e)); + } + finally { + endSession(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Loaded value from store [key=" + key + ", val=" + val + ']'); + + if (convert) { + val = convert(val); + + return val; + } + else + return val; + } + + return null; + } + + /** + * @param val Internal value. + * @return User value. + */ + @SuppressWarnings("unchecked") + private Object convert(Object val) { + if (val == null) + return null; + + return locStore ? ((IgniteBiTuple<Object, GridCacheVersion>)val).get1() : val; + } + + /** + * @return Whether DHT transaction can write to store from DHT. + */ + public boolean writeToStoreFromDht() { + return cctx.config().isWriteBehindEnabled() || locStore; + } + + /** + * @param tx Cache transaction. + * @param keys Cache keys. + * @param vis Closure to apply for loaded elements. + * @throws IgniteCheckedException If data loading failed. + */ + public void localStoreLoadAll(@Nullable IgniteInternalTx tx, + Collection<? extends KeyCacheObject> keys, + final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis) + throws IgniteCheckedException { + assert store != null; + assert locStore; + + loadAllFromStore(tx, keys, null, vis); + } + + /** + * Loads data from persistent store. + * + * @param tx Cache transaction. + * @param keys Cache keys. + * @param vis Closure. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If data loading failed. + */ + @SuppressWarnings({"unchecked"}) + public boolean loadAllFromStore(@Nullable IgniteInternalTx tx, + Collection<? extends KeyCacheObject> keys, + final IgniteBiInClosure<KeyCacheObject, Object> vis) throws IgniteCheckedException { + if (store != null) { + loadAllFromStore(tx, keys, vis, null); + + return true; + } + else { + for (KeyCacheObject key : keys) + vis.apply(key, null); + } + + return false; + } + + /** + * @param tx Cache transaction. + * @param keys Keys to load. + * @param vis Key/value closure (only one of vis or verVis can be specified). + * @param verVis Key/value/version closure (only one of vis or verVis can be specified). + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void loadAllFromStore(@Nullable IgniteInternalTx tx, + Collection<? extends KeyCacheObject> keys, + @Nullable final IgniteBiInClosure<KeyCacheObject, Object> vis, + @Nullable final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> verVis) + throws IgniteCheckedException { + assert vis != null ^ verVis != null; + assert verVis == null || locStore; + + final boolean convert = verVis == null; + + if (!keys.isEmpty()) { + if (keys.size() == 1) { + KeyCacheObject key = F.first(keys); + + if (convert) + vis.apply(key, loadFromStore(tx, key)); + else { + IgniteBiTuple<Object, GridCacheVersion> t = + (IgniteBiTuple<Object, GridCacheVersion>)loadFromStore(tx, key, false); + + if (t != null) + verVis.apply(key, t.get1(), t.get2()); + } + + return; + } + + Collection<Object> keys0; + + if (convertPortable) { + keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() { + @Override public Object apply(KeyCacheObject key) { + return cctx.unwrapPortableIfNeeded(key.value(cctx.cacheObjectContext(), false), false); + } + }); + } + else { + keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() { + @Override public Object apply(KeyCacheObject key) { + return key.value(cctx.cacheObjectContext(), false); + } + }); + } + + if (log.isDebugEnabled()) + log.debug("Loading values from store for keys: " + keys0); + + initSession(tx); + + boolean thewEx = true; + + try { + IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() { + @SuppressWarnings("ConstantConditions") + @Override public void apply(Object k, Object val) { + if (convert) { + Object v = convert(val); + + vis.apply(cctx.toCacheKeyObject(k), v); + } + else { + IgniteBiTuple<Object, GridCacheVersion> v = (IgniteBiTuple<Object, GridCacheVersion>)val; + + if (v != null) + verVis.apply(cctx.toCacheKeyObject(k), v.get1(), v.get2()); + } + } + }; + + if (keys.size() > singleThreadGate.loadAllThreshold()) { + Map<Object, Object> map = store.loadAll(keys0); + + if (map != null) { + for (Map.Entry<Object, Object> e : map.entrySet()) + c.apply(cctx.toCacheKeyObject(e.getKey()), e.getValue()); + } + } + else + singleThreadGate.loadAll(keys0, c); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (CacheLoaderException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheLoaderException(e)); + } + finally { + endSession(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Loaded values from store for keys: " + keys0); + } + } + + /** + * Loads data from persistent store. + * + * @param vis Closer to cache loaded elements. + * @param args User arguments. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If data loading failed. + */ + @SuppressWarnings({"ErrorNotRethrown", "unchecked"}) + public boolean loadCache(final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis, Object[] args) + throws IgniteCheckedException { + if (store != null) { + if (log.isDebugEnabled()) + log.debug("Loading all values from store."); + + initSession(null); + + boolean thewEx = true; + + try { + store.loadCache(new IgniteBiInClosure<Object, Object>() { + @Override public void apply(Object k, Object o) { + Object v; + GridCacheVersion ver = null; + + if (locStore) { + IgniteBiTuple<Object, GridCacheVersion> t = (IgniteBiTuple<Object, GridCacheVersion>)o; + + v = t.get1(); + ver = t.get2(); + } + else + v = o; + + KeyCacheObject cacheKey = cctx.toCacheKeyObject(k); + + vis.apply(cacheKey, v, ver); + } + }, args); + + thewEx = false; + } + catch (CacheLoaderException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheLoaderException(e)); + } + finally { + endSession(null, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Loaded all values from store."); + + return true; + } + + LT.warn(log, null, "Calling Cache.loadCache() method will have no effect, " + + "CacheConfiguration.getStore() is not defined for cache: " + cctx.namexx()); + + return false; + } + + /** + * Puts key-value pair into storage. + * + * @param tx Cache transaction. + * @param key Key. + * @param val Value. + * @param ver Version. + * @return {@code true} If there is a persistent storage. + * @throws IgniteCheckedException If storage failed. + */ + @SuppressWarnings("unchecked") + public boolean putToStore(@Nullable IgniteInternalTx tx, Object key, Object val, GridCacheVersion ver) + throws IgniteCheckedException { + if (store != null) { + // Never persist internal keys. + if (key instanceof GridCacheInternal) + return true; + + if (convertPortable) { + key = cctx.unwrapPortableIfNeeded(key, false); + val = cctx.unwrapPortableIfNeeded(val, false); + } + + if (log.isDebugEnabled()) + log.debug("Storing value in cache store [key=" + key + ", val=" + val + ']'); + + initSession(tx); + + boolean thewEx = true; + + try { + store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val)); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (CacheWriterException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheWriterException(e)); + } + finally { + endSession(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Stored value in cache store [key=" + key + ", val=" + val + ']'); + + return true; + } + + return false; + } + + /** + * Puts key-value pair into storage. + * + * @param tx Cache transaction. + * @param map Map. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If storage failed. + */ + public boolean putAllToStore(@Nullable IgniteInternalTx tx, + Map<Object, IgniteBiTuple<Object, GridCacheVersion>> map) + throws IgniteCheckedException + { + if (F.isEmpty(map)) + return true; + + if (map.size() == 1) { + Map.Entry<Object, IgniteBiTuple<Object, GridCacheVersion>> e = map.entrySet().iterator().next(); + + return putToStore(tx, e.getKey(), e.getValue().get1(), e.getValue().get2()); + } + else { + if (store != null) { + EntriesView entries = new EntriesView((Map)map); + + if (log.isDebugEnabled()) + log.debug("Storing values in cache store [entries=" + entries + ']'); + + initSession(tx); + + boolean thewEx = true; + + try { + store.writeAll(entries); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (Exception e) { + if (!(e instanceof CacheWriterException)) + e = new CacheWriterException(e); + + if (!entries.isEmpty()) { + List<Object> keys = new ArrayList<>(entries.size()); + + for (Cache.Entry<?, ?> entry : entries) + keys.add(entry.getKey()); + + throw new CacheStorePartialUpdateException(keys, e); + } + + throw new IgniteCheckedException(e); + } + finally { + endSession(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Stored value in cache store [entries=" + entries + ']'); + + return true; + } + + return false; + } + } + + /** + * @param tx Cache transaction. + * @param key Key. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If storage failed. + */ + @SuppressWarnings("unchecked") + public boolean removeFromStore(@Nullable IgniteInternalTx tx, Object key) throws IgniteCheckedException { + if (store != null) { + // Never remove internal key from store as it is never persisted. + if (key instanceof GridCacheInternal) + return false; + + if (convertPortable) + key = cctx.unwrapPortableIfNeeded(key, false); + + if (log.isDebugEnabled()) + log.debug("Removing value from cache store [key=" + key + ']'); + + initSession(tx); + + boolean thewEx = true; + + try { + store.delete(key); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (CacheWriterException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheWriterException(e)); + } + finally { + endSession(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Removed value from cache store [key=" + key + ']'); + + return true; + } + + return false; + } + + /** + * @param tx Cache transaction. + * @param keys Key. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If storage failed. + */ + @SuppressWarnings("unchecked") + public boolean removeAllFromStore(@Nullable IgniteInternalTx tx, Collection<Object> keys) + throws IgniteCheckedException { + if (F.isEmpty(keys)) + return true; + + if (keys.size() == 1) { + Object key = keys.iterator().next(); + + return removeFromStore(tx, key); + } + + if (store != null) { + Collection<Object> keys0 = convertPortable ? cctx.unwrapPortablesIfNeeded(keys, false) : keys; + + if (log.isDebugEnabled()) + log.debug("Removing values from cache store [keys=" + keys0 + ']'); + + initSession(tx); + + boolean thewEx = true; + + try { + store.deleteAll(keys0); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (Exception e) { + if (!(e instanceof CacheWriterException)) + e = new CacheWriterException(e); + + if (!keys0.isEmpty()) + throw new CacheStorePartialUpdateException(keys0, e); + + throw new IgniteCheckedException(e); + } + finally { + endSession(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Removed values from cache store [keys=" + keys0 + ']'); + + return true; + } + + return false; + } + + /** + * @return Store. + */ + public CacheStore<Object, Object> store() { + return store; + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void forceFlush() throws IgniteCheckedException { + if (store instanceof GridCacheWriteBehindStore) + ((GridCacheWriteBehindStore)store).forceFlush(); + } + + /** + * @param tx Transaction. + * @param commit Commit. + * @throws IgniteCheckedException If failed. + */ + public void txEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException { + assert store != null; + + initSession(tx); + + try { + store.sessionEnd(commit); + } + finally { + if (sesHolder != null) { + sesHolder.set(null); + + tx.removeMeta(SES_ATTR); + } + } + } + + /** + * @param e Class cast exception. + * @throws IgniteCheckedException Thrown exception. + */ + private void handleClassCastException(ClassCastException e) throws IgniteCheckedException { + assert e != null; + + if (e.getMessage() != null) { + throw new IgniteCheckedException("Cache store must work with portable objects if portables are " + + "enabled for cache [cacheName=" + cctx.namex() + ']', e); + } + else + throw e; + } + + /** + * Clears session holder. + */ + void endSession(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException { + try { + if (tx == null) + store.sessionEnd(threwEx); + } + catch (Exception e) { + if (!threwEx) + throw U.cast(e); + } + finally { + if (sesHolder != null) + sesHolder.set(null); + } + } + + /** + * @param tx Current transaction. + */ + void initSession(@Nullable IgniteInternalTx tx) { + if (sesHolder == null) + return; + + assert sesHolder.get() == null; + + SessionData ses; + + if (tx != null) { + ses = tx.meta(SES_ATTR); + + if (ses == null) { + ses = new SessionData(tx, cctx.name()); + + tx.addMeta(SES_ATTR, ses); + } + else + // Session cache name may change in cross-cache transaction. + ses.cacheName(cctx.name()); + } + else + ses = new SessionData(null, cctx.name()); + + sesHolder.set(ses); + } + + /** + * + */ + private static class SessionData { + /** */ + @GridToStringExclude + private final IgniteInternalTx tx; + + /** */ + private String cacheName; + + /** */ + @GridToStringInclude + private Map<Object, Object> props; + + /** + * @param tx Current transaction. + * @param cacheName Cache name. + */ + private SessionData(@Nullable IgniteInternalTx tx, @Nullable String cacheName) { + this.tx = tx; + this.cacheName = cacheName; + } + + /** + * @return Transaction. + */ + @Nullable private Transaction transaction() { + return tx != null ? tx.proxy() : null; + } + + /** + * @return Properties. + */ + private Map<Object, Object> properties() { + if (props == null) + props = new GridLeanMap<>(); + + return props; + } + + /** + * @return Cache name. + */ + private String cacheName() { + return cacheName; + } + + /** + * @param cacheName Cache name. + */ + private void cacheName(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SessionData.class, this, "tx", CU.txString(tx)); + } + } + + /** + * + */ + private static class ThreadLocalSession implements CacheStoreSession { + /** */ + private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>(); + + /** {@inheritDoc} */ + @Nullable @Override public Transaction transaction() { + SessionData ses0 = sesHolder.get(); + + return ses0 != null ? ses0.transaction() : null; + } + + /** {@inheritDoc} */ + @Override public boolean isWithinTransaction() { + return transaction() != null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <K1, V1> Map<K1, V1> properties() { + SessionData ses0 = sesHolder.get(); + + return ses0 != null ? (Map<K1, V1>)ses0.properties() : null; + } + + /** {@inheritDoc} */ + @Nullable @Override public String cacheName() { + SessionData ses0 = sesHolder.get(); + + return ses0 != null ? ses0.cacheName() : null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ThreadLocalSession.class, this); + } + } + + /** + * + */ + @SuppressWarnings("unchecked") + private class EntriesView extends AbstractCollection<Cache.Entry<?, ?>> { + /** */ + private final Map<?, IgniteBiTuple<?, GridCacheVersion>> map; + + /** */ + private Set<Object> rmvd; + + /** */ + private boolean cleared; + + /** + * @param map Map. + */ + private EntriesView(Map<?, IgniteBiTuple<?, GridCacheVersion>> map) { + assert map != null; + + this.map = map; + } + + /** {@inheritDoc} */ + @Override public int size() { + return cleared ? 0 : (map.size() - (rmvd != null ? rmvd.size() : 0)); + } + + /** {@inheritDoc} */ + @Override public boolean isEmpty() { + return cleared || !iterator().hasNext(); + } + + /** {@inheritDoc} */ + @Override public boolean contains(Object o) { + if (cleared || !(o instanceof Cache.Entry)) + return false; + + Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o; + + return map.containsKey(e.getKey()); + } + + /** {@inheritDoc} */ + @NotNull @Override public Iterator<Cache.Entry<?, ?>> iterator() { + if (cleared) + return F.emptyIterator(); + + final Iterator<Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>>> it0 = (Iterator)map.entrySet().iterator(); + + return new Iterator<Cache.Entry<?, ?>>() { + /** */ + private Cache.Entry<?, ?> cur; + + /** */ + private Cache.Entry<?, ?> next; + + /** + * + */ + { + checkNext(); + } + + /** + * + */ + private void checkNext() { + while (it0.hasNext()) { + Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>> e = it0.next(); + + Object k = e.getKey(); + + if (rmvd != null && rmvd.contains(k)) + continue; + + Object v = locStore ? e.getValue() : e.getValue().get1(); + + if (convertPortable) { + k = cctx.unwrapPortableIfNeeded(k, false); + v = cctx.unwrapPortableIfNeeded(v, false); + } + + next = new CacheEntryImpl<>(k, v); + + break; + } + } + + @Override public boolean hasNext() { + return next != null; + } + + @Override public Cache.Entry<?, ?> next() { + if (next == null) + throw new NoSuchElementException(); + + cur = next; + + next = null; + + checkNext(); + + return cur; + } + + @Override public void remove() { + if (cur == null) + throw new IllegalStateException(); + + addRemoved(cur); + + cur = null; + } + }; + } + + /** {@inheritDoc} */ + @Override public boolean add(Cache.Entry<?, ?> entry) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean addAll(Collection<? extends Cache.Entry<?, ?>> col) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean remove(Object o) { + if (cleared || !(o instanceof Cache.Entry)) + return false; + + Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o; + + if (rmvd != null && rmvd.contains(e.getKey())) + return false; + + if (mapContains(e)) { + addRemoved(e); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean containsAll(Collection<?> col) { + if (cleared) + return false; + + for (Object o : col) { + if (contains(o)) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean removeAll(Collection<?> col) { + if (cleared) + return false; + + boolean modified = false; + + for (Object o : col) { + if (remove(o)) + modified = true; + } + + return modified; + } + + /** {@inheritDoc} */ + @Override public boolean retainAll(Collection<?> col) { + if (cleared) + return false; + + boolean modified = false; + + for (Cache.Entry<?, ?> e : this) { + if (!col.contains(e)) { + addRemoved(e); + + modified = true; + } + } + + return modified; + } + + /** {@inheritDoc} */ + @Override public void clear() { + cleared = true; + } + + /** + * @param e Entry. + */ + private void addRemoved(Cache.Entry<?, ?> e) { + if (rmvd == null) + rmvd = new HashSet<>(); + + rmvd.add(e.getKey()); + } + + /** + * @param e Entry. + * @return {@code True} if original map contains entry. + */ + private boolean mapContains(Cache.Entry<?, ?> e) { + return map.containsKey(e.getKey()); + } + + /** {@inheritDoc} */ + public String toString() { + Iterator<Cache.Entry<?, ?>> it = iterator(); + + if (!it.hasNext()) + return "[]"; + + SB sb = new SB("["); + + while (true) { + Cache.Entry<?, ?> e = it.next(); + + sb.a(e.toString()); + + if (!it.hasNext()) + return sb.a(']').toString(); + + sb.a(", "); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java new file mode 100644 index 0000000..9a35d5e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java @@ -0,0 +1,1015 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.thread.*; +import org.jetbrains.annotations.*; +import org.jsr166.*; + +import javax.cache.integration.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static javax.cache.Cache.*; + +/** + * Internal wrapper for a {@link CacheStore} that enables write-behind logic. + * <p/> + * The general purpose of this approach is to reduce cache store load under high + * store update rate. The idea is to cache all write and remove operations in a pending + * map and delegate these changes to the underlying store either after timeout or + * if size of a pending map exceeded some pre-configured value. Another performance gain + * is achieved due to combining a group of similar operations to a single batch update. + * <p/> + * The essential flush size for the write-behind cache should be at least the estimated + * count of simultaneously written keys. In case of significantly smaller value there would + * be triggered a lot of flush events that will result in a high cache store load. + * <p/> + * Since write operations to the cache store are deferred, transaction support is lost; no + * transaction objects are passed to the underlying store. + */ +public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, LifecycleAware { + /** Default write cache initial capacity. */ + public static final int DFLT_INITIAL_CAPACITY = 1024; + + /** Overflow ratio for critical cache size calculation. */ + public static final float CACHE_OVERFLOW_RATIO = 1.5f; + + /** Default concurrency level of write cache. */ + public static final int DFLT_CONCUR_LVL = 64; + + /** Write cache initial capacity. */ + private int initCap = DFLT_INITIAL_CAPACITY; + + /** Concurrency level for write cache access. */ + private int concurLvl = DFLT_CONCUR_LVL; + + /** When cache size exceeds this value eldest entry will be stored to the underlying store. */ + private int cacheMaxSize = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_SIZE; + + /** Critical cache size. If cache size exceeds this value, data flush performed synchronously. */ + private int cacheCriticalSize; + + /** Count of worker threads performing underlying store updates. */ + private int flushThreadCnt = CacheConfiguration.DFLT_WRITE_FROM_BEHIND_FLUSH_THREAD_CNT; + + /** Cache flush frequency. All pending operations will be performed in not less then this value ms. */ + private long cacheFlushFreq = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_FREQUENCY; + + /** Maximum batch size for put and remove operations */ + private int batchSize = CacheConfiguration.DFLT_WRITE_BEHIND_BATCH_SIZE; + + /** Grid name. */ + private String gridName; + + /** Cache name. */ + private String cacheName; + + /** Underlying store. */ + private CacheStore<K, V> store; + + /** Write cache. */ + private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache; + + /** Flusher threads. */ + private GridWorker[] flushThreads; + + /** Atomic flag indicating store shutdown. */ + private AtomicBoolean stopping = new AtomicBoolean(true); + + /** Flush lock. */ + private Lock flushLock = new ReentrantLock(); + + /** Condition to determine records available for flush. */ + private Condition canFlush = flushLock.newCondition(); + + /** Variable for counting total cache overflows. */ + private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger(); + + /** Variable contains current number of overflow events. */ + private AtomicInteger cacheOverflowCntr = new AtomicInteger(); + + /** Variable for counting key-value pairs that are in {@link ValueStatus#RETRY} state. */ + private AtomicInteger retryEntriesCnt = new AtomicInteger(); + + /** Log. */ + private IgniteLogger log; + + /** Store manager. */ + private GridCacheOsStoreManager storeMgr; + + /** + * Creates a write-behind cache store for the given store. + * + * @param storeMgr Store manager. + * @param gridName Grid name. + * @param cacheName Cache name. + * @param log Grid logger. + * @param store {@code GridCacheStore} that need to be wrapped. + */ + public GridCacheWriteBehindStore( + GridCacheOsStoreManager storeMgr, + String gridName, + String cacheName, + IgniteLogger log, + CacheStore<K, V> store) { + this.storeMgr = storeMgr; + this.gridName = gridName; + this.cacheName = cacheName; + this.log = log; + this.store = store; + } + + /** + * Sets initial capacity for the write cache. + * + * @param initCap Initial capacity. + */ + public void setInitialCapacity(int initCap) { + this.initCap = initCap; + } + + /** + * Sets concurrency level for the write cache. Concurrency level is expected count of concurrent threads + * attempting to update cache. + * + * @param concurLvl Concurrency level. + */ + public void setConcurrencyLevel(int concurLvl) { + this.concurLvl = concurLvl; + } + + /** + * Sets the maximum size of the write cache. When the count of unique keys in write cache exceeds this value, + * the eldest entry in the cache is immediately scheduled for write to the underlying store. + * + * @param cacheMaxSize Max cache size. + */ + public void setFlushSize(int cacheMaxSize) { + this.cacheMaxSize = cacheMaxSize; + } + + /** + * Gets the maximum size of the write-behind buffer. When the count of unique keys + * in write buffer exceeds this value, the buffer is scheduled for write to the underlying store. + * <p/> + * If this value is {@code 0}, then flush is performed only on time-elapsing basis. However, + * when this value is {@code 0}, the cache critical size is set to + * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE} + * + * @return Buffer size that triggers flush procedure. + */ + public int getWriteBehindFlushSize() { + return cacheMaxSize; + } + + /** + * Sets the number of threads that will perform store update operations. + * + * @param flushThreadCnt Count of worker threads. + */ + public void setFlushThreadCount(int flushThreadCnt) { + this.flushThreadCnt = flushThreadCnt; + } + + /** + * Gets the number of flush threads that will perform store update operations. + * + * @return Count of worker threads. + */ + public int getWriteBehindFlushThreadCount() { + return flushThreadCnt; + } + + /** + * Sets the cache flush frequency. All pending operations on the underlying store will be performed + * within time interval not less then this value. + * + * @param cacheFlushFreq Time interval value in milliseconds. + */ + public void setFlushFrequency(long cacheFlushFreq) { + this.cacheFlushFreq = cacheFlushFreq; + } + + /** + * Gets the cache flush frequency. All pending operations on the underlying store will be performed + * within time interval not less then this value. + * <p/> + * If this value is {@code 0}, then flush is performed only when buffer size exceeds flush size. + * + * @return Flush frequency in milliseconds. + */ + public long getWriteBehindFlushFrequency() { + return cacheFlushFreq; + } + + /** + * Sets the maximum count of similar operations that can be grouped to a single batch. + * + * @param batchSize Maximum count of batch. + */ + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + /** + * Gets the maximum count of similar (put or remove) operations that can be grouped to a single batch. + * + * @return Maximum size of batch. + */ + public int getWriteBehindStoreBatchSize() { + return batchSize; + } + + /** + * Gets count of entries that were processed by the write-behind store and have not been + * flushed to the underlying store yet. + * + * @return Total count of entries in cache store internal buffer. + */ + public int getWriteBehindBufferSize() { + return writeCache.sizex(); + } + + /** + * @return Underlying store. + */ + public CacheStore<K, V> store() { + return store; + } + + /** + * Performs all the initialization logic for write-behind cache store. + * This class must not be used until this method returns. + */ + @Override public void start() { + assert cacheFlushFreq != 0 || cacheMaxSize != 0; + + if (stopping.compareAndSet(true, false)) { + if (log.isDebugEnabled()) + log.debug("Starting write-behind store for cache '" + cacheName + '\''); + + cacheCriticalSize = (int)(cacheMaxSize * CACHE_OVERFLOW_RATIO); + + if (cacheCriticalSize == 0) + cacheCriticalSize = CacheConfiguration.DFLT_WRITE_BEHIND_CRITICAL_SIZE; + + flushThreads = new GridWorker[flushThreadCnt]; + + writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl); + + for (int i = 0; i < flushThreads.length; i++) { + flushThreads[i] = new Flusher(gridName, "flusher-" + i, log); + + new IgniteThread(flushThreads[i]).start(); + } + } + } + + /** + * Gets count of write buffer overflow events since initialization. Each overflow event causes + * the ongoing flush operation to be performed synchronously. + * + * @return Count of cache overflow events since start. + */ + public int getWriteBehindTotalCriticalOverflowCount() { + return cacheTotalOverflowCntr.get(); + } + + /** + * Gets count of write buffer overflow events in progress at the moment. Each overflow event causes + * the ongoing flush operation to be performed synchronously. + * + * @return Count of cache overflow events since start. + */ + public int getWriteBehindCriticalOverflowCount() { + return cacheOverflowCntr.get(); + } + + /** + * Gets count of cache entries that are in a store-retry state. An entry is assigned a store-retry state + * when underlying store failed due some reason and cache has enough space to retain this entry till + * the next try. + * + * @return Count of entries in store-retry state. + */ + public int getWriteBehindErrorRetryCount() { + return retryEntriesCnt.get(); + } + + /** + * Performs shutdown logic for store. No put, get and remove requests will be processed after + * this method is called. + */ + @Override public void stop() { + if (stopping.compareAndSet(false, true)) { + if (log.isDebugEnabled()) + log.debug("Stopping write-behind store for cache '" + cacheName + '\''); + + wakeUp(); + + boolean graceful = true; + + for (GridWorker worker : flushThreads) + graceful &= U.join(worker, log); + + if (!graceful) + log.warning("Shutdown was aborted"); + } + } + + /** + * Forces all entries collected to be flushed to the underlying store. + * @throws IgniteCheckedException If failed. + */ + public void forceFlush() throws IgniteCheckedException { + wakeUp(); + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) { + store.loadCache(clo, args); + } + + /** {@inheritDoc} */ + @Override public Map<K, V> loadAll(Iterable<? extends K> keys) { + if (log.isDebugEnabled()) + log.debug("Store load all [keys=" + keys + ']'); + + Map<K, V> loaded = new HashMap<>(); + + Collection<K> remaining = new LinkedList<>(); + + for (K key : keys) { + StatefulValue<K, V> val = writeCache.get(key); + + if (val != null) { + val.readLock().lock(); + + try { + if (val.operation() == StoreOperation.PUT) + loaded.put(key, val.entry().getValue()); + else + assert val.operation() == StoreOperation.RMV : val.operation(); + } + finally { + val.readLock().unlock(); + } + } + else + remaining.add(key); + } + + // For items that were not found in queue. + if (!remaining.isEmpty()) { + Map<K, V> loaded0 = store.loadAll(remaining); + + if (loaded0 != null) + loaded.putAll(loaded0); + } + + return loaded; + } + + /** {@inheritDoc} */ + @Override public V load(K key) { + if (log.isDebugEnabled()) + log.debug("Store load [key=" + key + ']'); + + StatefulValue<K, V> val = writeCache.get(key); + + if (val != null) { + val.readLock().lock(); + + try { + switch (val.operation()) { + case PUT: + return val.entry().getValue(); + + case RMV: + return null; + + default: + assert false : "Unexpected operation: " + val.status(); + } + } + finally { + val.readLock().unlock(); + } + } + + return store.load(key); + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Entry<? extends K, ? extends V>> entries) { + for (Entry<? extends K, ? extends V> e : entries) + write(e); + } + + /** {@inheritDoc} */ + @Override public void write(Entry<? extends K, ? extends V> entry) { + try { + if (log.isDebugEnabled()) + log.debug("Store put [key=" + entry.getKey() + ", val=" + entry.getValue() + ']'); + + updateCache(entry.getKey(), entry, StoreOperation.PUT); + } + catch (IgniteInterruptedCheckedException e) { + throw new CacheWriterException(U.convertExceptionNoWrap(e)); + } + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) { + for (Object key : keys) + delete(key); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void delete(Object key) { + try { + if (log.isDebugEnabled()) + log.debug("Store remove [key=" + key + ']'); + + updateCache((K)key, null, StoreOperation.RMV); + } + catch (IgniteInterruptedCheckedException e) { + throw new CacheWriterException(U.convertExceptionNoWrap(e)); + } + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheWriteBehindStore.class, this); + } + + /** + * Performs flush-consistent cache update for the given key. + * + * @param key Key for which update is performed. + * @param val New value, may be null for remove operation. + * @param operation Updated value status + * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed. + */ + private void updateCache(K key, + @Nullable Entry<? extends K, ? extends V> val, + StoreOperation operation) + throws IgniteInterruptedCheckedException { + StatefulValue<K, V> newVal = new StatefulValue<>(val, operation); + + StatefulValue<K, V> prev; + + while ((prev = writeCache.putIfAbsent(key, newVal)) != null) { + prev.writeLock().lock(); + + try { + if (prev.status() == ValueStatus.PENDING) { + // Flush process in progress, try again. + prev.waitForFlush(); + + continue; + } + else if (prev.status() == ValueStatus.FLUSHED) + // This entry was deleted from map before we acquired the lock. + continue; + else if (prev.status() == ValueStatus.RETRY) + // New value has come, old value is no longer in RETRY state, + retryEntriesCnt.decrementAndGet(); + + assert prev.status() == ValueStatus.NEW || prev.status() == ValueStatus.RETRY; + + prev.update(val, operation, ValueStatus.NEW); + + break; + } + finally { + prev.writeLock().unlock(); + } + } + + // Now check the map size + if (writeCache.sizex() > cacheCriticalSize) + // Perform single store update in the same thread. + flushSingleValue(); + else if (cacheMaxSize > 0 && writeCache.sizex() > cacheMaxSize) + wakeUp(); + } + + /** + * Flushes one upcoming value to the underlying store. Called from + * {@link #updateCache(Object, Entry, StoreOperation)} method in case when current map size exceeds + * critical size. + */ + private void flushSingleValue() { + cacheOverflowCntr.incrementAndGet(); + + try { + Map<K, StatefulValue<K, V>> batch = null; + + for (Map.Entry<K, StatefulValue<K, V>> e : writeCache.entrySet()) { + StatefulValue<K, V> val = e.getValue(); + + val.writeLock().lock(); + + try { + ValueStatus status = val.status(); + + if (acquired(status)) + // Another thread is helping us, continue to the next entry. + continue; + + if (val.status() == ValueStatus.RETRY) + retryEntriesCnt.decrementAndGet(); + + assert retryEntriesCnt.get() >= 0; + + val.status(ValueStatus.PENDING); + + batch = Collections.singletonMap(e.getKey(), val); + } + finally { + val.writeLock().unlock(); + } + + if (!batch.isEmpty()) { + applyBatch(batch, false); + + cacheTotalOverflowCntr.incrementAndGet(); + + return; + } + } + } + finally { + cacheOverflowCntr.decrementAndGet(); + } + } + + /** + * Performs batch operation on underlying store. + * + * @param valMap Batch map. + * @param initSes {@code True} if need to initialize session. + */ + private void applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes) { + assert valMap.size() <= batchSize; + + StoreOperation operation = null; + + // Construct a map for underlying store + Map<K, Entry<? extends K, ? extends V>> batch = U.newLinkedHashMap(valMap.size()); + + for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) { + if (operation == null) + operation = e.getValue().operation(); + + assert operation == e.getValue().operation(); + + assert e.getValue().status() == ValueStatus.PENDING; + + batch.put(e.getKey(), e.getValue().entry()); + } + + if (updateStore(operation, batch, initSes)) { + for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) { + StatefulValue<K, V> val = e.getValue(); + + val.writeLock().lock(); + + try { + val.status(ValueStatus.FLUSHED); + + StatefulValue<K, V> prev = writeCache.remove(e.getKey()); + + // Additional check to ensure consistency. + assert prev == val : "Map value for key " + e.getKey() + " was updated during flush"; + + val.signalFlushed(); + } + finally { + val.writeLock().unlock(); + } + } + } + else { + // Exception occurred, we must set RETRY status + for (StatefulValue<K, V> val : valMap.values()) { + val.writeLock().lock(); + + try { + val.status(ValueStatus.RETRY); + + retryEntriesCnt.incrementAndGet(); + + val.signalFlushed(); + } + finally { + val.writeLock().unlock(); + } + } + } + } + + /** + * Tries to update store with the given values and returns {@code true} in case of success. + * + * <p/> If any exception in underlying store is occurred, this method checks the map size. + * If map size exceeds some critical value, then it returns {@code true} and this value will + * be lost. If map size does not exceed critical value, it will return false and value will + * be retained in write cache. + * + * @param operation Status indicating operation that should be performed. + * @param vals Key-Value map. + * @param initSes {@code True} if need to initialize session. + * @return {@code true} if value may be deleted from the write cache, + * {@code false} otherwise + */ + private boolean updateStore(StoreOperation operation, + Map<K, Entry<? extends K, ? extends V>> vals, + boolean initSes) { + + if (initSes && storeMgr != null) + storeMgr.initSession(null); + + try { + boolean threwEx = true; + + try { + switch (operation) { + case PUT: + store.writeAll(vals.values()); + + break; + + case RMV: + store.deleteAll(vals.keySet()); + + break; + + default: + assert false : "Unexpected operation: " + operation; + } + + threwEx = false; + + return true; + } + finally { + if (initSes && storeMgr != null) + storeMgr.endSession(null, threwEx); + } + } + catch (Exception e) { + LT.warn(log, e, "Unable to update underlying store: " + store); + + if (writeCache.sizex() > cacheCriticalSize || stopping.get()) { + for (Map.Entry<K, Entry<? extends K, ? extends V>> entry : vals.entrySet()) { + Object val = entry.getValue() != null ? entry.getValue().getValue() : null; + + log.warning("Failed to update store (value will be lost as current buffer size is greater " + + "than 'cacheCriticalSize' or node has been stopped before store was repaired) [key=" + + entry.getKey() + ", val=" + val + ", op=" + operation + "]"); + } + + return true; + } + + return false; + } + } + + /** + * Wakes up flushing threads if map size exceeded maximum value or in case of shutdown. + */ + private void wakeUp() { + flushLock.lock(); + + try { + canFlush.signalAll(); + } + finally { + flushLock.unlock(); + } + } + + /** + * Thread that performs time-based flushing of written values to the underlying storage. + */ + private class Flusher extends GridWorker { + /** {@inheritDoc */ + protected Flusher(String gridName, String name, IgniteLogger log) { + super(gridName, name, log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + while (!stopping.get() || writeCache.sizex() > 0) { + awaitOperationsAvailable(); + + flushCache(writeCache.entrySet().iterator()); + } + } + + /** + * This method awaits until enough elements in map are available or given timeout is over. + * + * @throws InterruptedException If awaiting was interrupted. + */ + private void awaitOperationsAvailable() throws InterruptedException { + flushLock.lock(); + + try { + do { + if (writeCache.sizex() <= cacheMaxSize || cacheMaxSize == 0) { + if (cacheFlushFreq > 0) + canFlush.await(cacheFlushFreq, TimeUnit.MILLISECONDS); + else + canFlush.await(); + } + } + while (writeCache.sizex() == 0 && !stopping.get()); + } + finally { + flushLock.unlock(); + } + } + + /** + * Removes values from the write cache and performs corresponding operation + * on the underlying store. + * + * @param it Iterator for write cache. + */ + private void flushCache(Iterator<Map.Entry<K,StatefulValue<K, V>>> it) { + StoreOperation operation = null; + + Map<K, StatefulValue<K, V>> batch = null; + Map<K, StatefulValue<K, V>> pending = U.newLinkedHashMap(batchSize); + + while (it.hasNext()) { + Map.Entry<K, StatefulValue<K, V>> e = it.next(); + + StatefulValue<K, V> val = e.getValue(); + + val.writeLock().lock(); + + try { + ValueStatus status = val.status(); + + if (acquired(status)) + // Another thread is helping us, continue to the next entry. + continue; + + if (status == ValueStatus.RETRY) + retryEntriesCnt.decrementAndGet(); + + assert retryEntriesCnt.get() >= 0; + + val.status(ValueStatus.PENDING); + + // We scan for the next operation and apply batch on operation change. Null means new batch. + if (operation == null) + operation = val.operation(); + + if (operation != val.operation()) { + // Operation is changed, so we need to perform a batch. + batch = pending; + pending = U.newLinkedHashMap(batchSize); + + operation = val.operation(); + + pending.put(e.getKey(), val); + } + else + pending.put(e.getKey(), val); + + if (pending.size() == batchSize) { + batch = pending; + pending = U.newLinkedHashMap(batchSize); + + operation = null; + } + } + finally { + val.writeLock().unlock(); + } + + if (batch != null && !batch.isEmpty()) { + applyBatch(batch, true); + batch = null; + } + } + + // Process the remainder. + if (!pending.isEmpty()) + applyBatch(pending, true); + } + } + + /** + * For test purposes only. + * + * @return Write cache for the underlying store operations. + */ + Map<K, StatefulValue<K, V>> writeCache() { + return writeCache; + } + + /** + * Enumeration that represents possible operations on the underlying store. + */ + private enum StoreOperation { + /** Put key-value pair to the underlying store. */ + PUT, + + /** Remove key from the underlying store. */ + RMV + } + + /** + * Enumeration that represents possible states of value in the map. + */ + private enum ValueStatus { + /** Value is scheduled for write or delete from the underlying cache but has not been captured by flusher. */ + NEW, + + /** Value is captured by flusher and store operation is performed at the moment. */ + PENDING, + + /** Store operation has failed and it will be re-tried at the next flush. */ + RETRY, + + /** Store operation succeeded and this value will be removed by flusher. */ + FLUSHED, + } + + /** + * Checks if given status indicates pending or complete flush operation. + * + * @param status Status to check. + * @return {@code true} if status indicates any pending or complete store update operation. + */ + private boolean acquired(ValueStatus status) { + return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED; + } + + /** + * A state-value-operation trio. + * + * @param <V> Value type. + */ + private static class StatefulValue<K, V> extends ReentrantReadWriteLock { + /** */ + private static final long serialVersionUID = 0L; + + /** Value. */ + @GridToStringInclude + private Entry<? extends K, ? extends V> val; + + /** Store operation. */ + private StoreOperation storeOperation; + + /** Value status. */ + private ValueStatus valStatus; + + /** Condition to wait for flush event */ + private Condition flushCond = writeLock().newCondition(); + + /** + * Creates a state-value pair with {@link ValueStatus#NEW} status. + * + * @param val Value. + * @param storeOperation Store operation. + */ + private StatefulValue(Entry<? extends K, ? extends V> val, StoreOperation storeOperation) { + assert storeOperation == StoreOperation.PUT || storeOperation == StoreOperation.RMV; + + this.val = val; + this.storeOperation = storeOperation; + valStatus = ValueStatus.NEW; + } + + /** + * @return Stored value. + */ + private Entry<? extends K, ? extends V> entry() { + return val; + } + + /** + * @return Store operation. + */ + private StoreOperation operation() { + return storeOperation; + } + + /** + * @return Value status + */ + private ValueStatus status() { + return valStatus; + } + + /** + * Updates value status. + * + * @param valStatus Value status. + */ + private void status(ValueStatus valStatus) { + this.valStatus = valStatus; + } + + /** + * Updates both value and value status. + * + * @param val Value. + * @param storeOperation Store operation. + * @param valStatus Value status. + */ + private void update(@Nullable Entry<? extends K, ? extends V> val, + StoreOperation storeOperation, + ValueStatus valStatus) { + this.val = val; + this.storeOperation = storeOperation; + this.valStatus = valStatus; + } + + /** + * Awaits a signal on flush condition + * + * @throws IgniteInterruptedCheckedException If thread was interrupted. + */ + private void waitForFlush() throws IgniteInterruptedCheckedException { + U.await(flushCond); + } + + /** + * Signals flush condition. + */ + @SuppressWarnings({"SignalWithoutCorrespondingAwait"}) + private void signalFlushed() { + flushCond.signalAll(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof StatefulValue)) + return false; + + StatefulValue other = (StatefulValue)o; + + return F.eq(val, other.val) && F.eq(valStatus, other.valStatus); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val != null ? val.hashCode() : 0; + + res = 31 * res + valStatus.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StatefulValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index acd3202..4dca9e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.util.*; @@ -444,11 +445,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter * * @return Store manager. */ - protected GridCacheStoreManager store() { + protected GridCacheOsStoreManager store() { if (!activeCacheIds().isEmpty()) { int cacheId = F.first(activeCacheIds()); - GridCacheStoreManager store = cctx.cacheContext(cacheId).store(); + GridCacheOsStoreManager store = cctx.cacheContext(cacheId).store(); return store.configured() ? store : null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 9d7f332..e051385 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.dr.*; +import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.dr.*; import org.apache.ignite.internal.transactions.*; @@ -492,7 +493,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter */ @SuppressWarnings({"CatchGenericClass"}) protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException { - GridCacheStoreManager store = store(); + GridCacheOsStoreManager store = store(); if (store != null && store.writeThrough() && storeEnabled() && (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) { @@ -500,7 +501,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (writeEntries != null) { Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null; List<Object> rmvCol = null; - GridCacheStoreManager writeStore = null; + GridCacheOsStoreManager writeStore = null; boolean skipNear = near() && store.writeToStoreFromDht(); @@ -981,7 +982,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else { - GridCacheStoreManager store = store(); + GridCacheOsStoreManager store = store(); if (store != null && (!internal() || groupLock())) { try { @@ -1086,7 +1087,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cctx.tm().rollbackTx(this); - GridCacheStoreManager store = store(); + GridCacheOsStoreManager store = store(); if (store != null && (near() || store.writeToStoreFromDht())) { if (!internal() || groupLock()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index ccc6aae..32e0caf 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -389,10 +389,10 @@ org.apache.ignite.internal.processors.cache.GridCacheProcessor$LocalAffinityFunc org.apache.ignite.internal.processors.cache.GridCacheProjectionImpl org.apache.ignite.internal.processors.cache.GridCacheProxyImpl org.apache.ignite.internal.processors.cache.GridCacheReturn -org.apache.ignite.internal.processors.cache.GridCacheStoreManager$1 -org.apache.ignite.internal.processors.cache.GridCacheStoreManager$2 -org.apache.ignite.internal.processors.cache.GridCacheStoreManager$3 -org.apache.ignite.internal.processors.cache.GridCacheStoreManager$4 +org.apache.ignite.internal.processors.cache.store.GridCacheOsStoreManager$1 +org.apache.ignite.internal.processors.cache.store.GridCacheOsStoreManager$2 +org.apache.ignite.internal.processors.cache.store.GridCacheOsStoreManager$3 +org.apache.ignite.internal.processors.cache.store.GridCacheOsStoreManager$4 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$10 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$12 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$14 @@ -432,9 +432,9 @@ org.apache.ignite.internal.processors.cache.GridCacheUtils$8 org.apache.ignite.internal.processors.cache.GridCacheUtils$9 org.apache.ignite.internal.processors.cache.GridCacheValueCollection org.apache.ignite.internal.processors.cache.GridCacheValueCollection$1 -org.apache.ignite.internal.processors.cache.GridCacheWriteBehindStore$StatefulValue -org.apache.ignite.internal.processors.cache.GridCacheWriteBehindStore$StoreOperation -org.apache.ignite.internal.processors.cache.GridCacheWriteBehindStore$ValueStatus +org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$StatefulValue +org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$StoreOperation +org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$ValueStatus org.apache.ignite.internal.processors.cache.GridPartitionLockKey org.apache.ignite.cache.CacheExistsException org.apache.ignite.internal.processors.cache.IgniteCacheProxy http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java index b9f9602..c4ba385 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java @@ -26,14 +26,13 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.junits.common.*; import org.apache.ignite.transactions.*; -import javax.cache.configuration.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; /** * Test that in {@link CacheMode#PARTITIONED} mode cache writes values only to the near cache store. <p/> This check - * is needed because in current implementation if {@link GridCacheWriteBehindStore} assumes that and user store is + * is needed because in current implementation if {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore} assumes that and user store is * wrapped only in near cache (see {@link GridCacheProcessor} init logic). */ @SuppressWarnings({"unchecked"}) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29377e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java deleted file mode 100644 index 883a216..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Harness for {@link GridCacheWriteBehindStore} tests. - */ -public abstract class GridCacheWriteBehindStoreAbstractSelfTest extends GridCommonAbstractTest { - /** Write cache size. */ - public static final int CACHE_SIZE = 1024; - - /** Value dump interval. */ - public static final int FLUSH_FREQUENCY = 1000; - - /** Underlying store. */ - protected GridCacheTestStore delegate = new GridCacheTestStore(); - - /** Tested store. */ - protected GridCacheWriteBehindStore<Integer, String> store; - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - delegate = null; - store = null; - - super.afterTestsStopped(); - } - - /** - * Initializes store. - * - * @param flushThreadCnt Count of flush threads - * @throws Exception If failed. - */ - protected void initStore(int flushThreadCnt) throws Exception { - store = new GridCacheWriteBehindStore<>(null, "", "", log, delegate); - - store.setFlushFrequency(FLUSH_FREQUENCY); - - store.setFlushSize(CACHE_SIZE); - - store.setFlushThreadCount(flushThreadCnt); - - delegate.reset(); - - store.start(); - } - - /** - * Shutdowns store. - * - * @throws Exception If failed. - */ - protected void shutdownStore() throws Exception { - store.stop(); - - assertTrue("Store cache must be empty after shutdown", store.writeCache().isEmpty()); - } - - /** - * Performs multiple put, get and remove operations in several threads on a store. After - * all threads finished their operations, returns the total set of keys that should be - * in underlying store. - * - * @param threadCnt Count of threads that should update keys. - * @param keysPerThread Count of unique keys assigned to a thread. - * @return Set of keys that was totally put in store. - * @throws Exception If failed. - */ - protected Set<Integer> runPutGetRemoveMultithreaded(int threadCnt, final int keysPerThread) throws Exception { - final ConcurrentMap<String, Set<Integer>> perThread = new ConcurrentHashMap<>(); - - final AtomicBoolean running = new AtomicBoolean(true); - - final AtomicInteger cntr = new AtomicInteger(); - - final AtomicInteger operations = new AtomicInteger(); - - IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { - @SuppressWarnings({"NullableProblems"}) - @Override public void run() { - // Initialize key set for this thread. - Set<Integer> set = new HashSet<>(); - - Set<Integer> old = perThread.putIfAbsent(Thread.currentThread().getName(), set); - - if (old != null) - set = old; - - List<Integer> original = new ArrayList<>(); - - Random rnd = new Random(); - - for (int i = 0; i < keysPerThread; i++) - original.add(cntr.getAndIncrement()); - - try { - while (running.get()) { - int op = rnd.nextInt(3); - int idx = rnd.nextInt(keysPerThread); - - int key = original.get(idx); - - switch (op) { - case 0: - store.write(new CacheEntryImpl<>(key, "val" + key)); - set.add(key); - - operations.incrementAndGet(); - - break; - - case 1: - store.delete(key); - set.remove(key); - - operations.incrementAndGet(); - - break; - - case 2: - default: - store.write(new CacheEntryImpl<>(key, "broken")); - - String val = store.load(key); - - assertEquals("Invalid intermediate value: " + val, "broken", val); - - store.write(new CacheEntryImpl<>(key, "val" + key)); - - set.add(key); - - // 2 put operations performed here. - operations.incrementAndGet(); - operations.incrementAndGet(); - operations.incrementAndGet(); - - break; - } - } - } - catch (Exception e) { - error("Unexpected exception in put thread", e); - - assert false; - } - } - }, threadCnt, "put"); - - U.sleep(10000); - - running.set(false); - - fut.get(); - - log().info(">>> " + operations + " operations performed totally"); - - Set<Integer> total = new HashSet<>(); - - for (Set<Integer> threadVals : perThread.values()) { - total.addAll(threadVals); - } - - return total; - } -}