http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java new file mode 100644 index 0000000..f9a966c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -0,0 +1,1111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.integration.*; +import java.util.*; + +/** + * Store manager. + */ +@SuppressWarnings({"AssignmentToCatchBlockParameter", "unchecked"}) +public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapter implements CacheStoreManager { + /** */ + private static final UUID SES_ATTR = UUID.randomUUID(); + + /** */ + protected CacheStore<Object, Object> store; + + /** */ + protected CacheStore<?, ?> cfgStore; + + /** */ + private CacheStoreBalancingWrapper<Object, Object> singleThreadGate; + + /** */ + private ThreadLocal<SessionData> sesHolder; + + /** */ + private boolean locStore; + + /** */ + private boolean writeThrough; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException { + GridKernalContext ctx = igniteContext(); + CacheConfiguration cfg = cacheConfiguration(); + + writeThrough = cfg.isWriteThrough(); + + this.cfgStore = cfgStore; + + store = cacheStoreWrapper(ctx, cfgStore, cfg); + + singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store); + + ThreadLocal<SessionData> sesHolder0 = null; + + if (cfgStore != null) { + sesHolder0 = ((Map<CacheStore, ThreadLocal>)sesHolders).get(cfgStore); + + if (sesHolder0 == null) { + ThreadLocalSession locSes = new ThreadLocalSession(); + + if (ctx.resource().injectStoreSession(cfgStore, locSes)) { + sesHolder0 = locSes.sesHolder; + + sesHolders.put(cfgStore, sesHolder0); + } + } + } + + sesHolder = sesHolder0; + + locStore = U.hasAnnotation(cfgStore, CacheLocalStore.class); + } + + /** {@inheritDoc} */ + @Override public boolean isWriteThrough() { + return writeThrough; + } + + /** + * Creates a wrapped cache store if write-behind cache is configured. + * + * @param ctx Kernal context. + * @param cfgStore Store provided in configuration. + * @param cfg Cache configuration. + * @return Instance if {@link GridCacheWriteBehindStore} if write-behind store is configured, + * or user-defined cache store. + */ + @SuppressWarnings({"unchecked"}) + private CacheStore cacheStoreWrapper(GridKernalContext ctx, + @Nullable CacheStore cfgStore, + CacheConfiguration cfg) { + if (cfgStore == null || !cfg.isWriteBehindEnabled()) + return cfgStore; + + GridCacheWriteBehindStore store = new GridCacheWriteBehindStore(this, + ctx.gridName(), + cfg.getName(), + ctx.log(GridCacheWriteBehindStore.class), + cfgStore); + + store.setFlushSize(cfg.getWriteBehindFlushSize()); + store.setFlushThreadCount(cfg.getWriteBehindFlushThreadCount()); + store.setFlushFrequency(cfg.getWriteBehindFlushFrequency()); + store.setBatchSize(cfg.getWriteBehindBatchSize()); + + return store; + } + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + if (store instanceof LifecycleAware) { + try { + // Avoid second start() call on store in case when near cache is enabled. + if (cctx.config().isWriteBehindEnabled()) { + if (!cctx.isNear()) + ((LifecycleAware)store).start(); + } + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to start cache store: " + e, e); + } + } + } + + /** {@inheritDoc} */ + @Override protected void stop0(boolean cancel) { + if (store instanceof LifecycleAware) { + try { + // Avoid second start() call on store in case when near cache is enabled. + if (cctx.config().isWriteBehindEnabled()) { + if (!cctx.isNear()) + ((LifecycleAware)store).stop(); + } + } + catch (Exception e) { + U.error(log(), "Failed to stop cache store.", e); + } + } + } + + /** {@inheritDoc} */ + @Override public boolean isLocal() { + return locStore; + } + + /** {@inheritDoc} */ + @Override public boolean configured() { + return store != null; + } + + /** {@inheritDoc} */ + @Override public CacheStore<?, ?> configuredStore() { + return cfgStore; + } + + /** {@inheritDoc} */ + @Override @Nullable public Object load(@Nullable IgniteInternalTx tx, KeyCacheObject key) + throws IgniteCheckedException { + return loadFromStore(tx, key, true); + } + + /** + * Loads data from persistent store. + * + * @param tx Cache transaction. + * @param key Cache key. + * @param convert Convert flag. + * @return Loaded value, possibly <tt>null</tt>. + * @throws IgniteCheckedException If data loading failed. + */ + @Nullable private Object loadFromStore(@Nullable IgniteInternalTx tx, + KeyCacheObject key, + boolean convert) + throws IgniteCheckedException { + if (store != null) { + if (key.internal()) + // Never load internal keys from store as they are never persisted. + return null; + + Object storeKey = key.value(cctx.cacheObjectContext(), false); + + if (convertPortable()) + storeKey = cctx.unwrapPortableIfNeeded(storeKey, false); + + if (log.isDebugEnabled()) + log.debug("Loading value from store for key: " + storeKey); + + sessionInit0(tx); + + boolean thewEx = true; + + Object val = null; + + try { + val = singleThreadGate.load(storeKey); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (CacheLoaderException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheLoaderException(e)); + } + finally { + sessionEnd0(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Loaded value from store [key=" + key + ", val=" + val + ']'); + + if (convert) { + val = convert(val); + + return val; + } + else + return val; + } + + return null; + } + + /** + * @param val Internal value. + * @return User value. + */ + private Object convert(Object val) { + if (val == null) + return null; + + return locStore ? ((IgniteBiTuple<Object, GridCacheVersion>)val).get1() : val; + } + + /** {@inheritDoc} */ + @Override public boolean isWriteToStoreFromDht() { + return cctx.config().isWriteBehindEnabled() || locStore; + } + + /** {@inheritDoc} */ + @Override public void localStoreLoadAll(@Nullable IgniteInternalTx tx, Collection keys, GridInClosure3 vis) + throws IgniteCheckedException { + assert store != null; + assert locStore; + + loadAllFromStore(tx, keys, null, vis); + } + + /** {@inheritDoc} */ + @Override public boolean loadAll(@Nullable IgniteInternalTx tx, Collection keys, IgniteBiInClosure vis) + throws IgniteCheckedException { + if (store != null) { + loadAllFromStore(tx, keys, vis, null); + + return true; + } + else { + for (Object key : keys) + vis.apply(key, null); + } + + return false; + } + + /** + * @param tx Cache transaction. + * @param keys Keys to load. + * @param vis Key/value closure (only one of vis or verVis can be specified). + * @param verVis Key/value/version closure (only one of vis or verVis can be specified). + * @throws IgniteCheckedException If failed. + */ + private void loadAllFromStore(@Nullable IgniteInternalTx tx, + Collection<? extends KeyCacheObject> keys, + @Nullable final IgniteBiInClosure<KeyCacheObject, Object> vis, + @Nullable final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> verVis) + throws IgniteCheckedException { + assert vis != null ^ verVis != null; + assert verVis == null || locStore; + + final boolean convert = verVis == null; + + if (!keys.isEmpty()) { + if (keys.size() == 1) { + KeyCacheObject key = F.first(keys); + + if (convert) + vis.apply(key, load(tx, key)); + else { + IgniteBiTuple<Object, GridCacheVersion> t = + (IgniteBiTuple<Object, GridCacheVersion>)loadFromStore(tx, key, false); + + if (t != null) + verVis.apply(key, t.get1(), t.get2()); + } + + return; + } + + Collection<Object> keys0; + + if (convertPortable()) { + keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() { + @Override public Object apply(KeyCacheObject key) { + return cctx.unwrapPortableIfNeeded(key.value(cctx.cacheObjectContext(), false), false); + } + }); + } + else { + keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() { + @Override public Object apply(KeyCacheObject key) { + return key.value(cctx.cacheObjectContext(), false); + } + }); + } + + if (log.isDebugEnabled()) + log.debug("Loading values from store for keys: " + keys0); + + sessionInit0(tx); + + boolean thewEx = true; + + try { + IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() { + @SuppressWarnings("ConstantConditions") + @Override public void apply(Object k, Object val) { + if (convert) { + Object v = convert(val); + + vis.apply(cctx.toCacheKeyObject(k), v); + } + else { + IgniteBiTuple<Object, GridCacheVersion> v = (IgniteBiTuple<Object, GridCacheVersion>)val; + + if (v != null) + verVis.apply(cctx.toCacheKeyObject(k), v.get1(), v.get2()); + } + } + }; + + if (keys.size() > singleThreadGate.loadAllThreshold()) { + Map<Object, Object> map = store.loadAll(keys0); + + if (map != null) { + for (Map.Entry<Object, Object> e : map.entrySet()) + c.apply(cctx.toCacheKeyObject(e.getKey()), e.getValue()); + } + } + else + singleThreadGate.loadAll(keys0, c); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (CacheLoaderException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheLoaderException(e)); + } + finally { + sessionEnd0(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Loaded values from store for keys: " + keys0); + } + } + + /** {@inheritDoc} */ + @Override public boolean loadCache(final GridInClosure3 vis, Object[] args) throws IgniteCheckedException { + if (store != null) { + if (log.isDebugEnabled()) + log.debug("Loading all values from store."); + + sessionInit0(null); + + boolean thewEx = true; + + try { + store.loadCache(new IgniteBiInClosure<Object, Object>() { + @Override public void apply(Object k, Object o) { + Object v; + GridCacheVersion ver = null; + + if (locStore) { + IgniteBiTuple<Object, GridCacheVersion> t = (IgniteBiTuple<Object, GridCacheVersion>)o; + + v = t.get1(); + ver = t.get2(); + } + else + v = o; + + KeyCacheObject cacheKey = cctx.toCacheKeyObject(k); + + vis.apply(cacheKey, v, ver); + } + }, args); + + thewEx = false; + } + catch (CacheLoaderException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheLoaderException(e)); + } + finally { + sessionEnd0(null, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Loaded all values from store."); + + return true; + } + + LT.warn(log, null, "Calling Cache.loadCache() method will have no effect, " + + "CacheConfiguration.getStore() is not defined for cache: " + cctx.namexx()); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean put(@Nullable IgniteInternalTx tx, Object key, Object val, GridCacheVersion ver) + throws IgniteCheckedException { + if (store != null) { + // Never persist internal keys. + if (key instanceof GridCacheInternal) + return true; + + if (convertPortable()) { + key = cctx.unwrapPortableIfNeeded(key, false); + val = cctx.unwrapPortableIfNeeded(val, false); + } + + if (log.isDebugEnabled()) + log.debug("Storing value in cache store [key=" + key + ", val=" + val + ']'); + + sessionInit0(tx); + + boolean thewEx = true; + + try { + store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val)); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (CacheWriterException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheWriterException(e)); + } + finally { + sessionEnd0(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Stored value in cache store [key=" + key + ", val=" + val + ']'); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean putAll(@Nullable IgniteInternalTx tx, Map map) throws IgniteCheckedException { + if (F.isEmpty(map)) + return true; + + if (map.size() == 1) { + Map.Entry<Object, IgniteBiTuple<Object, GridCacheVersion>> e = + ((Map<Object, IgniteBiTuple<Object, GridCacheVersion>>)map).entrySet().iterator().next(); + + return put(tx, e.getKey(), e.getValue().get1(), e.getValue().get2()); + } + else { + if (store != null) { + EntriesView entries = new EntriesView(map); + + if (log.isDebugEnabled()) + log.debug("Storing values in cache store [entries=" + entries + ']'); + + sessionInit0(tx); + + boolean thewEx = true; + + try { + store.writeAll(entries); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (Exception e) { + if (!(e instanceof CacheWriterException)) + e = new CacheWriterException(e); + + if (!entries.isEmpty()) { + List<Object> keys = new ArrayList<>(entries.size()); + + for (Cache.Entry<?, ?> entry : entries) + keys.add(entry.getKey()); + + throw new CacheStorePartialUpdateException(keys, e); + } + + throw new IgniteCheckedException(e); + } + finally { + sessionEnd0(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Stored value in cache store [entries=" + entries + ']'); + + return true; + } + + return false; + } + } + + /** {@inheritDoc} */ + @Override public boolean remove(@Nullable IgniteInternalTx tx, Object key) throws IgniteCheckedException { + if (store != null) { + // Never remove internal key from store as it is never persisted. + if (key instanceof GridCacheInternal) + return false; + + if (convertPortable()) + key = cctx.unwrapPortableIfNeeded(key, false); + + if (log.isDebugEnabled()) + log.debug("Removing value from cache store [key=" + key + ']'); + + sessionInit0(tx); + + boolean thewEx = true; + + try { + store.delete(key); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (CacheWriterException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheWriterException(e)); + } + finally { + sessionEnd0(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Removed value from cache store [key=" + key + ']'); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys) + throws IgniteCheckedException { + if (F.isEmpty(keys)) + return true; + + if (keys.size() == 1) { + Object key = keys.iterator().next(); + + return remove(tx, key); + } + + if (store != null) { + Collection<Object> keys0 = convertPortable() ? cctx.unwrapPortablesIfNeeded(keys, false) : keys; + + if (log.isDebugEnabled()) + log.debug("Removing values from cache store [keys=" + keys0 + ']'); + + sessionInit0(tx); + + boolean thewEx = true; + + try { + store.deleteAll(keys0); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (Exception e) { + if (!(e instanceof CacheWriterException)) + e = new CacheWriterException(e); + + if (!keys0.isEmpty()) + throw new CacheStorePartialUpdateException(keys0, e); + + throw new IgniteCheckedException(e); + } + finally { + sessionEnd0(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Removed values from cache store [keys=" + keys0 + ']'); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public CacheStore<Object, Object> store() { + return store; + } + + /** {@inheritDoc} */ + @Override public void forceFlush() throws IgniteCheckedException { + if (store instanceof GridCacheWriteBehindStore) + ((GridCacheWriteBehindStore)store).forceFlush(); + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException { + assert store != null; + + sessionInit0(tx); + + try { + store.sessionEnd(commit); + } + finally { + if (sesHolder != null) { + sesHolder.set(null); + + tx.removeMeta(SES_ATTR); + } + } + } + + /** + * @param e Class cast exception. + * @throws IgniteCheckedException Thrown exception. + */ + private void handleClassCastException(ClassCastException e) throws IgniteCheckedException { + assert e != null; + + if (e.getMessage() != null) { + throw new IgniteCheckedException("Cache store must work with portable objects if portables are " + + "enabled for cache [cacheName=" + cctx.namex() + ']', e); + } + else + throw e; + } + + /** {@inheritDoc} */ + @Override public void writeBehindSessionInit() { + sessionInit0(null); + } + + /** {@inheritDoc} */ + @Override public void writeBehindSessionEnd(boolean threwEx) throws IgniteCheckedException { + sessionEnd0(null, threwEx); + } + + /** + * @param tx Current transaction. + */ + private void sessionInit0(@Nullable IgniteInternalTx tx) { + if (sesHolder == null) + return; + + assert sesHolder.get() == null; + + SessionData ses; + + if (tx != null) { + ses = tx.meta(SES_ATTR); + + if (ses == null) { + ses = new SessionData(tx, cctx.name()); + + tx.addMeta(SES_ATTR, ses); + } + else + // Session cache name may change in cross-cache transaction. + ses.cacheName(cctx.name()); + } + else + ses = new SessionData(null, cctx.name()); + + sesHolder.set(ses); + } + + /** + * Clears session holder. + */ + private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException { + try { + if (tx == null) + store.sessionEnd(threwEx); + } + catch (Exception e) { + if (!threwEx) + throw U.cast(e); + } + finally { + if (sesHolder != null) + sesHolder.set(null); + } + } + + /** + * @return Ignite context. + */ + protected abstract GridKernalContext igniteContext(); + + /** + * @return Cache configuration. + */ + protected abstract CacheConfiguration cacheConfiguration(); + + /** + * @return Convert-portable flag. + */ + protected abstract boolean convertPortable(); + + /** + * + */ + private static class SessionData { + /** */ + @GridToStringExclude + private final IgniteInternalTx tx; + + /** */ + private String cacheName; + + /** */ + @GridToStringInclude + private Map<Object, Object> props; + + /** + * @param tx Current transaction. + * @param cacheName Cache name. + */ + private SessionData(@Nullable IgniteInternalTx tx, @Nullable String cacheName) { + this.tx = tx; + this.cacheName = cacheName; + } + + /** + * @return Transaction. + */ + @Nullable private Transaction transaction() { + return tx != null ? tx.proxy() : null; + } + + /** + * @return Properties. + */ + private Map<Object, Object> properties() { + if (props == null) + props = new GridLeanMap<>(); + + return props; + } + + /** + * @return Cache name. + */ + private String cacheName() { + return cacheName; + } + + /** + * @param cacheName Cache name. + */ + private void cacheName(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SessionData.class, this, "tx", CU.txString(tx)); + } + } + + /** + * + */ + private static class ThreadLocalSession implements CacheStoreSession { + /** */ + private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>(); + + /** {@inheritDoc} */ + @Nullable @Override public Transaction transaction() { + SessionData ses0 = sesHolder.get(); + + return ses0 != null ? ses0.transaction() : null; + } + + /** {@inheritDoc} */ + @Override public boolean isWithinTransaction() { + return transaction() != null; + } + + /** {@inheritDoc} */ + @Override public <K1, V1> Map<K1, V1> properties() { + SessionData ses0 = sesHolder.get(); + + return ses0 != null ? (Map<K1, V1>)ses0.properties() : null; + } + + /** {@inheritDoc} */ + @Nullable @Override public String cacheName() { + SessionData ses0 = sesHolder.get(); + + return ses0 != null ? ses0.cacheName() : null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ThreadLocalSession.class, this); + } + } + + /** + * + */ + private class EntriesView extends AbstractCollection<Cache.Entry<?, ?>> { + /** */ + private final Map<?, IgniteBiTuple<?, GridCacheVersion>> map; + + /** */ + private Set<Object> rmvd; + + /** */ + private boolean cleared; + + /** + * @param map Map. + */ + private EntriesView(Map<?, IgniteBiTuple<?, GridCacheVersion>> map) { + assert map != null; + + this.map = map; + } + + /** {@inheritDoc} */ + @Override public int size() { + return cleared ? 0 : (map.size() - (rmvd != null ? rmvd.size() : 0)); + } + + /** {@inheritDoc} */ + @Override public boolean isEmpty() { + return cleared || !iterator().hasNext(); + } + + /** {@inheritDoc} */ + @Override public boolean contains(Object o) { + if (cleared || !(o instanceof Cache.Entry)) + return false; + + Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o; + + return map.containsKey(e.getKey()); + } + + /** {@inheritDoc} */ + @NotNull @Override public Iterator<Cache.Entry<?, ?>> iterator() { + if (cleared) + return F.emptyIterator(); + + final Iterator<Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>>> it0 = (Iterator)map.entrySet().iterator(); + + return new Iterator<Cache.Entry<?, ?>>() { + /** */ + private Cache.Entry<?, ?> cur; + + /** */ + private Cache.Entry<?, ?> next; + + /** + * + */ + { + checkNext(); + } + + /** + * + */ + private void checkNext() { + while (it0.hasNext()) { + Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>> e = it0.next(); + + Object k = e.getKey(); + + if (rmvd != null && rmvd.contains(k)) + continue; + + Object v = locStore ? e.getValue() : e.getValue().get1(); + + if (convertPortable()) { + k = cctx.unwrapPortableIfNeeded(k, false); + v = cctx.unwrapPortableIfNeeded(v, false); + } + + next = new CacheEntryImpl<>(k, v); + + break; + } + } + + @Override public boolean hasNext() { + return next != null; + } + + @Override public Cache.Entry<?, ?> next() { + if (next == null) + throw new NoSuchElementException(); + + cur = next; + + next = null; + + checkNext(); + + return cur; + } + + @Override public void remove() { + if (cur == null) + throw new IllegalStateException(); + + addRemoved(cur); + + cur = null; + } + }; + } + + /** {@inheritDoc} */ + @Override public boolean add(Cache.Entry<?, ?> entry) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean addAll(Collection<? extends Cache.Entry<?, ?>> col) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean remove(Object o) { + if (cleared || !(o instanceof Cache.Entry)) + return false; + + Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o; + + if (rmvd != null && rmvd.contains(e.getKey())) + return false; + + if (mapContains(e)) { + addRemoved(e); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean containsAll(Collection<?> col) { + if (cleared) + return false; + + for (Object o : col) { + if (contains(o)) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean removeAll(Collection<?> col) { + if (cleared) + return false; + + boolean modified = false; + + for (Object o : col) { + if (remove(o)) + modified = true; + } + + return modified; + } + + /** {@inheritDoc} */ + @Override public boolean retainAll(Collection<?> col) { + if (cleared) + return false; + + boolean modified = false; + + for (Cache.Entry<?, ?> e : this) { + if (!col.contains(e)) { + addRemoved(e); + + modified = true; + } + } + + return modified; + } + + /** {@inheritDoc} */ + @Override public void clear() { + cleared = true; + } + + /** + * @param e Entry. + */ + private void addRemoved(Cache.Entry<?, ?> e) { + if (rmvd == null) + rmvd = new HashSet<>(); + + rmvd.add(e.getKey()); + } + + /** + * @param e Entry. + * @return {@code True} if original map contains entry. + */ + private boolean mapContains(Cache.Entry<?, ?> e) { + return map.containsKey(e.getKey()); + } + + /** {@inheritDoc} */ + public String toString() { + Iterator<Cache.Entry<?, ?>> it = iterator(); + + if (!it.hasNext()) + return "[]"; + + SB sb = new SB("["); + + while (true) { + Cache.Entry<?, ?> e = it.next(); + + sb.a(e.toString()); + + if (!it.hasNext()) + return sb.a(']').toString(); + + sb.a(", "); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java index 9a35d5e..250aaa9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java @@ -123,7 +123,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy private IgniteLogger log; /** Store manager. */ - private GridCacheOsStoreManager storeMgr; + private CacheStoreManager storeMgr; /** * Creates a write-behind cache store for the given store. @@ -135,7 +135,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy * @param store {@code GridCacheStore} that need to be wrapped. */ public GridCacheWriteBehindStore( - GridCacheOsStoreManager storeMgr, + GridCacheStoreManagerAdapter storeMgr, String gridName, String cacheName, IgniteLogger log, @@ -663,7 +663,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy boolean initSes) { if (initSes && storeMgr != null) - storeMgr.initSession(null); + storeMgr.writeBehindSessionInit(); try { boolean threwEx = true; @@ -690,7 +690,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy } finally { if (initSes && storeMgr != null) - storeMgr.endSession(null, threwEx); + storeMgr.writeBehindSessionEnd(threwEx); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 4dca9e4..1b66b4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -445,11 +445,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter * * @return Store manager. */ - protected GridCacheOsStoreManager store() { + protected CacheStoreManager store() { if (!activeCacheIds().isEmpty()) { int cacheId = F.first(activeCacheIds()); - GridCacheOsStoreManager store = cctx.cacheContext(cacheId).store(); + CacheStoreManager store = cctx.cacheContext(cacheId).store(); return store.configured() ? store : null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index e051385..2c91e65 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -371,7 +371,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } return new GridFinishedFuture<>( - cacheCtx.store().loadAllFromStore(this, keys, c)); + cacheCtx.store().loadAll(this, keys, c)); } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); @@ -388,7 +388,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return false; } - return cacheCtx.store().loadAllFromStore(IgniteTxLocalAdapter.this, keys, c); + return cacheCtx.store().loadAll(IgniteTxLocalAdapter.this, keys, c); } }, true); @@ -493,17 +493,17 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter */ @SuppressWarnings({"CatchGenericClass"}) protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException { - GridCacheOsStoreManager store = store(); + CacheStoreManager store = store(); - if (store != null && store.writeThrough() && storeEnabled() && - (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) { + if (store != null && store.isWriteThrough() && storeEnabled() && + (!internal() || groupLock()) && (near() || store.isWriteToStoreFromDht())) { try { if (writeEntries != null) { Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null; List<Object> rmvCol = null; - GridCacheOsStoreManager writeStore = null; + CacheStoreManager writeStore = null; - boolean skipNear = near() && store.writeToStoreFromDht(); + boolean skipNear = near() && store.isWriteToStoreFromDht(); for (IgniteTxEntry e : writeEntries) { if (skipNear && e.cached().isNear()) @@ -528,7 +528,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (rmvCol != null && !rmvCol.isEmpty()) { assert writeStore != null; - writeStore.removeAllFromStore(this, rmvCol); + writeStore.removeAll(this, rmvCol); // Reset. rmvCol.clear(); @@ -538,7 +538,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // Batch-process puts if cache ID has changed. if (writeStore != null && writeStore != cacheCtx.store() && putMap != null && !putMap.isEmpty()) { - writeStore.putAllToStore(this, putMap); + writeStore.putAll(this, putMap); // Reset. putMap.clear(); @@ -569,7 +569,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (putMap != null && !putMap.isEmpty()) { assert writeStore != null; - writeStore.putAllToStore(this, putMap); + writeStore.putAll(this, putMap); // Reset. putMap.clear(); @@ -578,7 +578,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } if (writeStore != null && writeStore != cacheCtx.store() && rmvCol != null && !rmvCol.isEmpty()) { - writeStore.removeAllFromStore(this, rmvCol); + writeStore.removeAll(this, rmvCol); // Reset. rmvCol.clear(); @@ -610,7 +610,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert writeStore != null; // Batch put at the end of transaction. - writeStore.putAllToStore(this, putMap); + writeStore.putAll(this, putMap); } if (rmvCol != null && !rmvCol.isEmpty()) { @@ -618,12 +618,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert writeStore != null; // Batch remove at the end of transaction. - writeStore.removeAllFromStore(this, rmvCol); + writeStore.removeAll(this, rmvCol); } } // Commit while locks are held. - store.txEnd(this, true); + store.sessionEnd(this, true); } catch (IgniteCheckedException ex) { commitError(ex); @@ -982,11 +982,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else { - GridCacheOsStoreManager store = store(); + CacheStoreManager store = store(); if (store != null && (!internal() || groupLock())) { try { - store.txEnd(this, true); + store.sessionEnd(this, true); } catch (IgniteCheckedException e) { commitError(e); @@ -1087,11 +1087,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cctx.tm().rollbackTx(this); - GridCacheOsStoreManager store = store(); + CacheStoreManager store = store(); - if (store != null && (near() || store.writeToStoreFromDht())) { + if (store != null && (near() || store.isWriteToStoreFromDht())) { if (!internal() || groupLock()) - store.txEnd(this, false); + store.sessionEnd(this, false); } } catch (Error | IgniteCheckedException | RuntimeException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index 42586d2..6a8117f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -156,10 +156,4 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { * with {@link IgniteImmutable} annotation. */ public boolean immutable(Object obj); - - /** - * @param cacheName Cache name. - * @return {@code True} if portable format should be preserved when passing values to cache store. - */ - public boolean keepPortableInStore(@Nullable String cacheName); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index f65b7bd..6e46757 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -234,11 +234,6 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme } /** {@inheritDoc} */ - @Override public boolean keepPortableInStore(@Nullable String cacheName) { - return false; - } - - /** {@inheritDoc} */ @Override public void onCacheProcessorStarted() { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java index 69ca1ae..580ff49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java @@ -22,6 +22,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.dr.*; +import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.plugin.*; import java.util.*; @@ -76,10 +77,13 @@ public class CachePluginManager extends GridCacheManagerAdapter { /** * Creates optional component. * + * @param ctx Kernal context. + * @param cfg Cache configuration. * @param cls Component class. * @return Created component. */ - public <T> T createComponent(Class<T> cls) { + @SuppressWarnings("unchecked") + public <T> T createComponent(GridKernalContext ctx, CacheConfiguration cfg, Class<T> cls) { for (CachePluginProvider provider : providers) { T res = (T)provider.createComponent(cls); @@ -91,6 +95,8 @@ public class CachePluginManager extends GridCacheManagerAdapter { return (T)new GridOsCacheDrManager(); else if (cls.equals(CacheConflictResolutionManager.class)) return (T)new CacheOsConflictResolutionManager<>(); + else if (cls.equals(CacheStoreManager.class)) + return (T)new CacheOsStoreManager(ctx, cfg); throw new IgniteException("Unsupported component type: " + cls); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 32e0caf..8bb4d5c 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -389,10 +389,10 @@ org.apache.ignite.internal.processors.cache.GridCacheProcessor$LocalAffinityFunc org.apache.ignite.internal.processors.cache.GridCacheProjectionImpl org.apache.ignite.internal.processors.cache.GridCacheProxyImpl org.apache.ignite.internal.processors.cache.GridCacheReturn -org.apache.ignite.internal.processors.cache.store.GridCacheOsStoreManager$1 -org.apache.ignite.internal.processors.cache.store.GridCacheOsStoreManager$2 -org.apache.ignite.internal.processors.cache.store.GridCacheOsStoreManager$3 -org.apache.ignite.internal.processors.cache.store.GridCacheOsStoreManager$4 +org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter$1 +org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter$2 +org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter$3 +org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter$4 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$10 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$12 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$14 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/038f610e/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 41f23ff..b277d48 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -60,10 +60,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { true, new GridCacheEventManager(), new GridCacheSwapManager(false), - new GridCacheOsStoreManager(null, - new IdentityHashMap<CacheStore, ThreadLocal>(), - null, - new CacheConfiguration()), + new CacheOsStoreManager(null, new CacheConfiguration()), new GridCacheEvictionManager(), new GridCacheLocalQueryManager<K, V>(), new CacheContinuousQueryManager(), @@ -75,5 +72,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { new CacheOsConflictResolutionManager<K, V>(), new CachePluginManager(ctx, new CacheConfiguration()) ); + + store().initialize(null, new IdentityHashMap<CacheStore, ThreadLocal>()); } }