http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 0000000,a6f8301..0edbe0d mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@@ -1,0 -1,5338 +1,5689 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.cache.affinity.*; + import org.apache.ignite.cache.datastructures.*; + import org.apache.ignite.cache.query.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.compute.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.fs.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.util.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.plugin.security.*; + import org.apache.ignite.portables.*; + import org.apache.ignite.resources.*; + import org.apache.ignite.transactions.*; + import org.apache.ignite.internal.processors.cache.affinity.*; + import org.apache.ignite.internal.processors.cache.datastructures.*; + import org.apache.ignite.internal.processors.cache.distributed.dht.*; + import org.apache.ignite.internal.processors.cache.dr.*; + import org.apache.ignite.internal.processors.cache.query.*; + import org.apache.ignite.internal.processors.cache.transactions.*; + import org.apache.ignite.internal.processors.dr.*; + import org.apache.ignite.internal.processors.task.*; + import org.apache.ignite.internal.util.future.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jdk8.backport.*; + import org.jetbrains.annotations.*; + + import javax.cache.expiry.*; + import javax.cache.processor.*; + import java.io.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.locks.*; + + import static java.util.Collections.*; + import static org.apache.ignite.IgniteSystemProperties.*; + import static org.apache.ignite.events.IgniteEventType.*; + import static org.apache.ignite.cache.CacheFlag.*; + import static org.apache.ignite.cache.GridCachePeekMode.*; + import static org.apache.ignite.transactions.IgniteTxConcurrency.*; + import static org.apache.ignite.transactions.IgniteTxIsolation.*; + import static org.apache.ignite.internal.GridClosureCallMode.*; + import static org.apache.ignite.internal.processors.dr.GridDrType.*; + import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*; + + /** + * Adapter for different cache implementations. + */ + @SuppressWarnings("unchecked") + public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, + GridCacheProjectionEx<K, V>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** clearAll() split threshold. */ + public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000; + + /** Deserialization stash. */ + private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String, + String>>() { + @Override protected IgniteBiTuple<String, String> initialValue() { + return F.t2(); + } + }; + + /** {@link GridCacheReturn}-to-value conversion. */ + private static final IgniteClosure RET2VAL = + new CX1<IgniteFuture<GridCacheReturn<Object>>, Object>() { + @Nullable @Override public Object applyx(IgniteFuture<GridCacheReturn<Object>> fut) throws IgniteCheckedException { + return fut.get().value(); + } + + @Override public String toString() { + return "Cache return value to value converter."; + } + }; + + /** {@link GridCacheReturn}-to-success conversion. */ + private static final IgniteClosure RET2FLAG = + new CX1<IgniteFuture<GridCacheReturn<Object>>, Boolean>() { + @Override public Boolean applyx(IgniteFuture<GridCacheReturn<Object>> fut) throws IgniteCheckedException { + return fut.get().success(); + } + + @Override public String toString() { + return "Cache return value to boolean flag converter."; + } + }; + + /** */ + protected boolean keyCheck = !Boolean.getBoolean(GG_CACHE_KEY_VALIDATION_DISABLED); + + /** */ + private boolean valCheck = true; + + /** Last asynchronous future. */ + protected ThreadLocal<FutureHolder> lastFut = new ThreadLocal<FutureHolder>() { + @Override protected FutureHolder initialValue() { + return new FutureHolder(); + } + }; + + /** Cache configuration. */ + @GridToStringExclude + protected GridCacheContext<K, V> ctx; + + /** Local map. */ + @GridToStringExclude + protected GridCacheConcurrentMap<K, V> map; + + /** Local node ID. */ + @GridToStringExclude + protected UUID locNodeId; + + /** Cache configuration. */ + @GridToStringExclude + protected CacheConfiguration cacheCfg; + + /** Grid configuration. */ + @GridToStringExclude + protected IgniteConfiguration gridCfg; + + /** Cache metrics. */ - protected volatile GridCacheMetricsAdapter metrics; ++ protected CacheMetricsAdapter metrics; ++ ++ /** Cache mxBean. */ ++ protected IgniteCacheMxBean mxBean; + + /** Logger. */ + protected IgniteLogger log; + + /** Queries impl. */ + private CacheQueries<K, V> qry; + + /** Data structures impl. */ + private CacheDataStructures dataStructures; + + /** Affinity impl. */ + private CacheAffinity<K> aff; + + /** Whether this cache is GGFS data cache. */ + private boolean ggfsDataCache; + + /** Whether this cache is Mongo data cache. */ + @SuppressWarnings("UnusedDeclaration") + private boolean mongoDataCache; + + /** Whether this cache is Mongo meta cache. */ + @SuppressWarnings("UnusedDeclaration") + private boolean mongoMetaCache; + + /** Current GGFS data cache size. */ + private LongAdder ggfsDataCacheSize; + + /** Max space for GGFS. */ + private long ggfsDataSpaceMax; + + /** Asynchronous operations limit semaphore. */ + private Semaphore asyncOpsSem; + + /** {@inheritDoc} */ + @Override public String name() { + return ctx.config().getName(); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup gridProjection() { + return ctx.grid().forCache(name()); + } + + /** + * Empty constructor required by {@link Externalizable}. + */ + protected GridCacheAdapter() { + // No-op. + } + + /** + * @param ctx Cache context. + * @param startSize Start size. + */ + @SuppressWarnings("OverriddenMethodCallDuringObjectConstruction") + protected GridCacheAdapter(GridCacheContext<K, V> ctx, int startSize) { + this(ctx, new GridCacheConcurrentMap<>(ctx, startSize, 0.75F)); + } + + /** + * @param ctx Cache context. + * @param map Concurrent map. + */ + @SuppressWarnings("OverriddenMethodCallDuringObjectConstruction") + protected GridCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, V> map) { + assert ctx != null; + + this.ctx = ctx; + + gridCfg = ctx.gridConfig(); + cacheCfg = ctx.config(); + + locNodeId = ctx.gridConfig().getNodeId(); + + this.map = map; + + log = ctx.gridConfig().getGridLogger().getLogger(getClass()); + - metrics = new GridCacheMetricsAdapter(); ++ metrics = new CacheMetricsAdapter(ctx); ++ ++ mxBean = new CacheMxBeanImpl(this); + + IgniteFsConfiguration[] ggfsCfgs = gridCfg.getGgfsConfiguration(); + + if (ggfsCfgs != null) { + for (IgniteFsConfiguration ggfsCfg : ggfsCfgs) { + if (F.eq(ctx.name(), ggfsCfg.getDataCacheName())) { + if (!ctx.isNear()) { + ggfsDataCache = true; + ggfsDataCacheSize = new LongAdder(); + + ggfsDataSpaceMax = ggfsCfg.getMaxSpaceSize(); + + if (ggfsDataSpaceMax == 0) { + long maxMem = Runtime.getRuntime().maxMemory(); + + // We leave JVM at least 500M of memory for correct operation. + long jvmFreeSize = (maxMem - 512 * 1024 * 1024); + + if (jvmFreeSize <= 0) + jvmFreeSize = maxMem / 2; + + long dfltMaxSize = (long)(0.8f * maxMem); + + ggfsDataSpaceMax = Math.min(dfltMaxSize, jvmFreeSize); + } + } + + break; + } + } + } + + if (ctx.config().getMaxConcurrentAsyncOperations() > 0) + asyncOpsSem = new Semaphore(ctx.config().getMaxConcurrentAsyncOperations()); + + init(); + + qry = new GridCacheQueriesImpl<>(ctx, null); + dataStructures = new GridCacheDataStructuresImpl<>(ctx); + aff = new GridCacheAffinityImpl<>(ctx); + } + + /** + * Prints memory stats. + */ + public void printMemoryStats() { + if (ctx.isNear()) { + X.println(">>> Near cache size: " + size()); + + ctx.near().dht().printMemoryStats(); + } + else if (ctx.isDht()) + X.println(">>> DHT cache size: " + size()); + else + X.println(">>> Cache size: " + size()); + } + + /** + * @return Base map. + */ + public GridCacheConcurrentMap<K, V> map() { + return map; + } + + /** + * @return Context. + */ + public GridCacheContext<K, V> context() { + return ctx; + } + + /** + * @return Logger. + */ + protected IgniteLogger log() { + return log; + } + + /** + * @return {@code True} if this is near cache. + */ + public boolean isNear() { + return false; + } + + /** + * @return {@code True} if cache is local. + */ + public boolean isLocal() { + return false; + } + + /** + * @return {@code True} if cache is colocated. + */ + public boolean isColocated() { + return false; + } + + /** + * @return {@code True} if cache is DHT Atomic. + */ + public boolean isDhtAtomic() { + return false; + } + + /** + * @return {@code True} if cache is DHT. + */ + public boolean isDht() { + return false; + } + + /** + * @return Preloader. + */ + public abstract GridCachePreloader<K, V> preloader(); + + /** {@inheritDoc} */ + @Override public CacheQueries<K, V> queries() { + return qry; + } + + /** {@inheritDoc} */ + @Override public CacheAffinity<K> affinity() { + return aff; + } + + /** {@inheritDoc} */ + @Override public CacheDataStructures dataStructures() { + return dataStructures; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "RedundantCast"}) + @Override public <K1, V1> GridCache<K1, V1> cache() { + return (GridCache<K1, V1>)this; + } + + /** {@inheritDoc} */ + @Override public Set<CacheFlag> flags() { + return F.asSet(ctx.forcedFlags()); + } + + /** {@inheritDoc} */ + @Override public IgnitePredicate<CacheEntry<K, V>> predicate() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridCacheProjectionEx<K, V> forSubjectId(UUID subjId) { + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, + ctx, + null, + null, + null, + subjId, + false, + null); + + return new GridCacheProxyImpl<>(ctx, prj, prj); + } + + /** {@inheritDoc} */ + @Override public CacheProjection<K, V> flagsOn(@Nullable CacheFlag[] flags) { + if (F.isEmpty(flags)) + return this; + + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, + ctx, + null, + null, + EnumSet.copyOf(F.asList(flags)), + null, + false, + null); + + return new GridCacheProxyImpl<>(ctx, prj, prj); + } + + /** {@inheritDoc} */ + @Override public CacheProjection<K, V> flagsOff(@Nullable CacheFlag[] flags) { + return this; + } + + /** {@inheritDoc} */ + @Override public <K1, V1> CacheProjection<K1, V1> keepPortable() { + GridCacheProjectionImpl<K1, V1> prj = keepPortable0(); + + return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj); + } + + /** + * Internal routine to get "keep-portable" projection. + * + * @return Projection with "keep-portable" flag. + */ + public <K1, V1> GridCacheProjectionImpl<K1, V1> keepPortable0() { + return new GridCacheProjectionImpl<>( + (CacheProjection<K1, V1>)this, + (GridCacheContext<K1, V1>)ctx, + null, + null, + null, + null, + ctx.portableEnabled(), + null + ); + } + + /** {@inheritDoc} */ + @Nullable @Override public ExpiryPolicy expiry() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc) { + return new GridCacheProjectionImpl<>( + this, + ctx, + null, + null, + null, + null, + ctx.portableEnabled(), + plc); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "RedundantCast"}) + @Override public <K1, V1> CacheProjection<K1, V1> projection( + Class<? super K1> keyType, + Class<? super V1> valType + ) { + if (PortableObject.class.isAssignableFrom(keyType) || PortableObject.class.isAssignableFrom(valType)) + throw new IllegalStateException("Failed to create cache projection for portable objects. " + + "Use keepPortable() method instead."); + + if (ctx.deploymentEnabled()) { + try { + ctx.deploy().registerClasses(keyType, valType); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>((CacheProjection<K1, V1>)this, + (GridCacheContext<K1, V1>)ctx, + CU.<K1, V1>typeFilter(keyType, valType), + /*filter*/null, + /*flags*/null, + /*clientId*/null, + false, + null); + + return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj); + } + + /** {@inheritDoc} */ + @Override public CacheProjection<K, V> projection(IgniteBiPredicate<K, V> p) { + if (p == null) + return this; + + if (ctx.deploymentEnabled()) { + try { + ctx.deploy().registerClasses(p); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, + ctx, + p, + null, + null, + null, + false, + null); + + return new GridCacheProxyImpl<>(ctx, prj, prj); + } + + /** {@inheritDoc} */ + @Override public CacheProjection<K, V> projection(IgnitePredicate<CacheEntry<K, V>> filter) { + if (filter == null) + return this; + + if (ctx.deploymentEnabled()) { + try { + ctx.deploy().registerClasses(filter); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>( + this, + ctx, + null, + filter, + null, + null, + false, + null); + + return new GridCacheProxyImpl<>(ctx, prj, prj); + } + + /** {@inheritDoc} */ + @Override public CacheConfiguration configuration() { + return ctx.config(); + } + + /** + * @param keys Keys to lock. + * @param timeout Lock timeout. + * @param tx Transaction. + * @param isRead {@code True} for read operations. + * @param retval Flag to return value. + * @param isolation Transaction isolation. + * @param invalidate Invalidate flag. + * @param accessTtl TTL for read operation. + * @param filter Optional filter. + * @return Locks future. + */ + public abstract IgniteFuture<Boolean> txLockAsync( + Collection<? extends K> keys, + long timeout, + IgniteTxLocalEx<K, V> tx, + boolean isRead, + boolean retval, + IgniteTxIsolation isolation, + boolean invalidate, + long accessTtl, + IgnitePredicate<CacheEntry<K, V>>[] filter); + + /** + * Post constructor initialization for subclasses. + */ + protected void init() { + // No-op. + } + + /** + * Starts this cache. Child classes should override this method + * to provide custom start-up behavior. + * + * @throws IgniteCheckedException If start failed. + */ + public void start() throws IgniteCheckedException { + // No-op. + } + + /** + * Startup info. + * + * @return Startup info. + */ + protected final String startInfo() { + return "Cache started: " + ctx.config().getName(); + } + + /** + * Stops this cache. Child classes should override this method + * to provide custom stop behavior. + */ + public void stop() { + // Nulling thread local reference to ensure values will be eventually GCed + // no matter what references these futures are holding. + lastFut = null; + } + + /** + * Stop info. + * + * @return Stop info. + */ + protected final String stopInfo() { + return "Cache stopped: " + ctx.config().getName(); + } + + /** + * Kernal start callback. + * + * @throws IgniteCheckedException If callback failed. + */ + protected void onKernalStart() throws IgniteCheckedException { + // No-op. + } + + /** + * Kernal stop callback. + */ + public void onKernalStop() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean isEmpty() { + return values().isEmpty(); + } + + /** {@inheritDoc} */ + @Override public boolean containsKey(K key) { + return containsKey(key, null); + } + + /** {@inheritDoc} */ + @Override public boolean containsValue(V val) { + return containsValue(val, null); + } + + /** {@inheritDoc} */ + @Override public V peek(K key) { + return peek(key, (IgnitePredicate<CacheEntry<K, V>>)null); + } + + /** {@inheritDoc} */ + @Override public V peek(K key, @Nullable Collection<GridCachePeekMode> modes) throws IgniteCheckedException { + return peek0(key, modes, ctx.tm().localTxx()); + } + + /** {@inheritDoc} */ + public Map<K, V> peekAll(@Nullable Collection<? extends K> keys, + @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + return peekAll0(keys, filter, null); + } + + /** + * @param failFast Fail fast flag. + * @param key Key. + * @param mode Peek mode. + * @param filter Filter. + * @return Peeked value. + * @throws GridCacheFilterFailedException If filter failed. + */ + @Nullable protected GridTuple<V> peek0(boolean failFast, K key, GridCachePeekMode mode, + @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws GridCacheFilterFailedException { + A.notNull(key, "key"); + + if (keyCheck) + validateCacheKey(key); + + ctx.checkSecurity(GridSecurityPermission.CACHE_READ); + + GridCacheEntryEx<K, V> e = null; + + try { + if (ctx.portableEnabled()) + key = (K)ctx.marshalToPortable(key); + + e = peekEx(key); + + if (e != null) { + GridTuple<V> peek = e.peek0(failFast, mode, filter, ctx.tm().localTxx()); + + if (peek != null) { + V v = peek.get(); + + if (ctx.portableEnabled()) + v = (V)ctx.unwrapPortableIfNeeded(v, ctx.keepPortable()); + + return F.t(ctx.cloneOnFlag(v)); + } + } + + IgniteTxEx<K, V> tx = ctx.tm().localTx(); + + if (tx != null) { + GridTuple<V> peek = tx.peek(ctx, failFast, key, filter); + + if (peek != null) { + V v = peek.get(); + + if (ctx.portableEnabled()) + v = (V)ctx.unwrapPortableIfNeeded(v, ctx.keepPortable()); + + return F.t(ctx.cloneOnFlag(v)); + } + } + + return null; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during 'peek': " + e); + + return null; + } + catch (IgniteCheckedException ex) { + throw new IgniteException(ex); + } + } + + /** + * @param keys Keys. + * @param filter Filter. + * @param skipped Skipped keys, possibly {@code null}. + * @return Peeked map. + */ + protected Map<K, V> peekAll0(@Nullable Collection<? extends K> keys, + @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter, @Nullable Collection<K> skipped) { + if (F.isEmpty(keys)) + return Collections.emptyMap(); + + if (keyCheck) + validateCacheKeys(keys); + + ctx.checkSecurity(GridSecurityPermission.CACHE_READ); + + Map<K, V> ret = new HashMap<>(keys.size(), 1.0f); + + for (K k : keys) { + GridCacheEntryEx<K, V> e = peekEx(k); + + if (e != null) + try { + ret.put(k, ctx.cloneOnFlag(e.peekFailFast(SMART, filter))); + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during 'peek' (will skip): " + e); + } + catch (GridCacheFilterFailedException ignore) { + if (log.isDebugEnabled()) + log.debug("Filter failed during peek (will skip): " + e); + + if (skipped != null) + skipped.add(k); + } + catch (IgniteCheckedException ex) { + throw new IgniteException(ex); + } + } + + return ret; + } + + /** + * @param key Key. + * @param modes Peek modes. + * @param tx Transaction to peek at (if modes contains TX value). + * @return Peeked value. + * @throws IgniteCheckedException In case of error. + */ + @Nullable protected V peek0(K key, @Nullable Collection<GridCachePeekMode> modes, IgniteTxEx<K, V> tx) + throws IgniteCheckedException { + try { + GridTuple<V> peek = peek0(false, key, modes, tx); + + return peek != null ? peek.get() : null; + } + catch (GridCacheFilterFailedException ex) { + ex.printStackTrace(); + + assert false; // Should never happen. + + return null; + } + } + + /** + * @param failFast If {@code true}, then filter failure will result in exception. + * @param key Key. + * @param modes Peek modes. + * @param tx Transaction to peek at (if modes contains TX value). + * @return Peeked value. + * @throws IgniteCheckedException In case of error. + * @throws GridCacheFilterFailedException If filer validation failed. + */ + @Nullable protected GridTuple<V> peek0(boolean failFast, K key, @Nullable Collection<GridCachePeekMode> modes, + IgniteTxEx<K, V> tx) throws IgniteCheckedException, GridCacheFilterFailedException { + if (F.isEmpty(modes)) + return F.t(peek(key, (IgnitePredicate<CacheEntry<K, V>>)null)); + + assert modes != null; + + A.notNull(key, "key"); + + if (keyCheck) + validateCacheKey(key); + + ctx.checkSecurity(GridSecurityPermission.CACHE_READ); + + GridCacheEntryEx<K, V> e = peekEx(key); + + try { + for (GridCachePeekMode m : modes) { + GridTuple<V> val = null; + + if (e != null) + val = e.peek0(failFast, m, null, tx); + else if (m == TX || m == SMART) + val = tx != null ? tx.peek(ctx, failFast, key, null) : null; + else if (m == SWAP) + val = peekSwap(key); + else if (m == DB) + val = peekDb(key); + + if (val != null) + return F.t(ctx.cloneOnFlag(val.get())); + } + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during 'peek': " + e); + } + + return null; + } + + /** + * @param key Key to read from swap storage. + * @return Value from swap storage. + * @throws IgniteCheckedException In case of any errors. + */ + @Nullable private GridTuple<V> peekSwap(K key) throws IgniteCheckedException { + GridCacheSwapEntry<V> e = ctx.swap().read(key); + + return e != null ? F.t(e.value()) : null; + } + + /** + * @param key Key to read from persistent store. + * @return Value from persistent store. + * @throws IgniteCheckedException In case of any errors. + */ + @Nullable private GridTuple<V> peekDb(K key) throws IgniteCheckedException { + V val = ctx.store().loadFromStore(ctx.tm().localTxx(), key); + + return val != null ? F.t(val) : null; + } + + /** + * @param keys Keys. + * @param modes Modes. + * @param tx Transaction. + * @param skipped Keys skipped during filter validation. + * @return Peeked values. + * @throws IgniteCheckedException If failed. + */ + protected Map<K, V> peekAll0(@Nullable Collection<? extends K> keys, @Nullable Collection<GridCachePeekMode> modes, + IgniteTxEx<K, V> tx, @Nullable Collection<K> skipped) throws IgniteCheckedException { + if (F.isEmpty(keys)) + return emptyMap(); + + if (keyCheck) + validateCacheKeys(keys); + + Map<K, V> ret = new HashMap<>(keys.size(), 1.0f); + + for (K k : keys) { + try { + GridTuple<V> val = peek0(skipped != null, k, modes, tx); + + if (val != null) + ret.put(k, val.get()); + } + catch (GridCacheFilterFailedException ignored) { + if (log.isDebugEnabled()) + log.debug("Filter validation failed for key: " + k); + + if (skipped != null) + skipped.add(k); + } + } + + return ret; + } + + /** + * Pokes an entry. + * + * @param key Key. + * @param newVal New values. + * @return {@code True} if entry was poked. + * @throws IgniteCheckedException If failed. + */ + private boolean poke0(K key, @Nullable V newVal) throws IgniteCheckedException { + GridCacheEntryEx<K, V> entryEx = peekEx(key); + + if (entryEx == null || entryEx.deleted()) + return newVal == null; + + if (newVal == null) + return entryEx.markObsolete(ctx.versions().next()); + + try { + entryEx.poke(newVal); + } + catch (GridCacheEntryRemovedException ignore) { + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public void forEach(IgniteInClosure<CacheEntry<K, V>> vis) { + A.notNull(vis, "vis"); + + for (CacheEntry<K, V> e : entrySet()) + vis.apply(e); + } + + /** {@inheritDoc} */ + @Override public boolean forAll(IgnitePredicate<CacheEntry<K, V>> vis) { + A.notNull(vis, "vis"); + + for (CacheEntry<K, V> e : entrySet()) + if (!vis.apply(e)) + return false; + + return true; + } + + /** + * Undeploys and removes all entries for class loader. + * + * @param leftNodeId Left node ID. + * @param ldr Class loader to undeploy. + */ + public void onUndeploy(@Nullable UUID leftNodeId, ClassLoader ldr) { + ctx.deploy().onUndeploy(leftNodeId, ldr); + } + + /** {@inheritDoc} */ + @Nullable @Override public CacheEntry<K, V> entry(K key) { + A.notNull(key, "key"); + + if (keyCheck) + validateCacheKey(key); + + return entryEx(key, true).wrap(true); + } + + /** + * + * @param key Entry key. + * @return Entry or <tt>null</tt>. + */ + @Nullable public GridCacheEntryEx<K, V> peekEx(K key) { + return entry0(key, ctx.affinity().affinityTopologyVersion(), false, false); + } + + /** + * @param key Entry key. + * @return Entry (never {@code null}). + */ + public GridCacheEntryEx<K, V> entryEx(K key) { + return entryEx(key, false); + } + + /** + * @param key Entry key. + * @param touch Whether created entry should be touched. + * @return Entry (never {@code null}). + */ + public GridCacheEntryEx<K, V> entryEx(K key, boolean touch) { + GridCacheEntryEx<K, V> e = entry0(key, ctx.affinity().affinityTopologyVersion(), true, touch); + + assert e != null; + + return e; + } + + /** + * @param topVer Topology version. + * @param key Entry key. + * @return Entry (never {@code null}). + */ + public GridCacheEntryEx<K, V> entryEx(K key, long topVer) { + GridCacheEntryEx<K, V> e = entry0(key, topVer, true, false); + + assert e != null; + + return e; + } + + /** + * @param key Entry key. + * @param topVer Topology version at the time of creation. + * @param create Flag to create entry if it does not exist. + * @param touch Flag to touch created entry (only if entry was actually created). + * @return Entry or <tt>null</tt>. + */ + @Nullable private GridCacheEntryEx<K, V> entry0(K key, long topVer, boolean create, boolean touch) { + GridTriple<GridCacheMapEntry<K, V>> t = map.putEntryIfObsoleteOrAbsent(topVer, key, null, + ctx.config().getDefaultTimeToLive(), create); + + GridCacheEntryEx<K, V> cur = t.get1(); + GridCacheEntryEx<K, V> created = t.get2(); + GridCacheEntryEx<K, V> doomed = t.get3(); + + if (doomed != null && ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED)) + // Event notification. + ctx.events().addEvent(doomed.partition(), doomed.key(), locNodeId, (IgniteUuid)null, null, + EVT_CACHE_ENTRY_DESTROYED, null, false, null, false, null, null, null); + + if (created != null) { + // Event notification. + if (ctx.events().isRecordable(EVT_CACHE_ENTRY_CREATED)) + ctx.events().addEvent(created.partition(), created.key(), locNodeId, (IgniteUuid)null, null, + EVT_CACHE_ENTRY_CREATED, null, false, null, false, null, null, null); + + if (touch) + ctx.evicts().touch(cur, topVer); + } + + return cur; + } + + /** + * Same as {@link #entrySet()} but for internal use only to + * avoid casting. + * + * @return Set of entry wrappers. + */ + public Set<GridCacheEntryImpl<K, V>> wrappers() { + return map.wrappers(CU.<K, V>empty()); + } + + /** + * @return Set of internal cached entry representations, excluding {@link GridCacheInternal} keys. + */ + public Set<GridCacheEntryEx<K, V>> entries() { + return map.entries0(); + } + + /** + * @return Set of internal cached entry representations, including {@link GridCacheInternal} keys. + */ + public Set<GridCacheEntryEx<K, V>> allEntries() { + return map.allEntries0(); + } + + /** {@inheritDoc} */ + @Override public Set<CacheEntry<K, V>> entrySet() { + return entrySet((IgnitePredicate<CacheEntry<K, V>>[])null); + } + + + /** {@inheritDoc} */ + @Override public Set<CacheEntry<K, V>> entrySetx(IgnitePredicate<CacheEntry<K, V>>... filter) { + return map.entriesx(filter); + } + + /** {@inheritDoc} */ + @Override public Set<CacheEntry<K, V>> primaryEntrySetx(IgnitePredicate<CacheEntry<K, V>>... filter) { + return map.entriesx(F.and(filter, F.<K, V>cachePrimary())); + } + + /** {@inheritDoc} */ + @Override public Set<CacheEntry<K, V>> entrySet(int part) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public Set<CacheEntry<K, V>> primaryEntrySet() { - return primaryEntrySet((IgnitePredicate<CacheEntry<K, V>>[])null); ++ return primaryEntrySet((IgnitePredicate<CacheEntry<K, V>>[]) null); + } + + /** {@inheritDoc} */ + @Override public Set<K> keySet() { - return keySet((IgnitePredicate<CacheEntry<K, V>>[])null); ++ return keySet((IgnitePredicate<CacheEntry<K, V>>[]) null); + } + + /** {@inheritDoc} */ + @Override public Set<K> primaryKeySet() { - return primaryKeySet((IgnitePredicate<CacheEntry<K, V>>[])null); ++ return primaryKeySet((IgnitePredicate<CacheEntry<K, V>>[]) null); + } + + /** {@inheritDoc} */ + @Override public Collection<V> values() { - return values((IgnitePredicate<CacheEntry<K, V>>[])null); ++ return values((IgnitePredicate<CacheEntry<K, V>>[]) null); + } + + /** {@inheritDoc} */ + public Collection<V> values(IgnitePredicate<CacheEntry<K, V>>... filter) { + return map.values(filter); + } + + /** {@inheritDoc} */ + @Override public Collection<V> primaryValues() { + return primaryValues((IgnitePredicate<CacheEntry<K, V>>[])null); + } + + /** + * + * @param key Entry key. + */ + public void removeIfObsolete(K key) { + assert key != null; + + GridCacheEntryEx<K, V> entry = map.removeEntryIfObsolete(key); + + if (entry != null) { + assert entry.obsolete() : "Removed non-obsolete entry: " + entry; + + if (log.isDebugEnabled()) + log.debug("Removed entry from cache: " + entry); + + if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED)) + // Event notification. + ctx.events().addEvent(entry.partition(), entry.key(), locNodeId, (IgniteUuid)null, null, + EVT_CACHE_ENTRY_DESTROYED, null, false, null, false, null, null, null); + } + else if (log.isDebugEnabled()) + log.debug("Remove will not be done for key (obsolete entry got replaced or removed): " + key); + } + + /** + * Split clear all task into multiple runnables. + * + * @return Split runnables. + */ + public List<GridCacheClearAllRunnable<K, V>> splitClearAll() { + assert CLEAR_ALL_SPLIT_THRESHOLD > 0; + + int keySize = size(); + + int cnt = Math.min(keySize / CLEAR_ALL_SPLIT_THRESHOLD + (keySize % CLEAR_ALL_SPLIT_THRESHOLD != 0 ? 1 : 0), + Runtime.getRuntime().availableProcessors()); + + if (cnt == 0) + cnt = 1; // Still perform cleanup since there could be entries in swap. + + GridCacheVersion obsoleteVer = ctx.versions().next(); + + List<GridCacheClearAllRunnable<K, V>> res = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) + res.add(new GridCacheClearAllRunnable<>(this, obsoleteVer, i, cnt)); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean clear(K key) { + return clear0(key); + } + + /** {@inheritDoc} */ + @Override public void clearAll() { + ctx.denyOnFlag(READ); + ctx.checkSecurity(GridSecurityPermission.CACHE_REMOVE); + + List<GridCacheClearAllRunnable<K, V>> jobs = splitClearAll(); + + if (!F.isEmpty(jobs)) { + ExecutorService execSvc = null; + + if (jobs.size() > 1) { + execSvc = Executors.newFixedThreadPool(jobs.size() - 1); + + for (int i = 1; i < jobs.size(); i++) + execSvc.submit(jobs.get(i)); + } + + try { + jobs.get(0).run(); + } + finally { + if (execSvc != null) { + execSvc.shutdown(); + + try { + while (!execSvc.isTerminated() && !Thread.currentThread().isInterrupted()) + execSvc.awaitTermination(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ignore) { + U.warn(log, "Got interrupted while waiting for Cache.clearAll() executor service to " + + "finish."); + + Thread.currentThread().interrupt(); + } + } + } + } + } + + /** + * @param keys Keys. + * @param readers Readers flag. + */ + public void clearAll(Collection<? extends K> keys, boolean readers) { + if (F.isEmpty(keys)) + return; + + if (keyCheck) + validateCacheKeys(keys); + + GridCacheVersion obsoleteVer = ctx.versions().next(); + + for (K key : keys) { + GridCacheEntryEx<K, V> e = peekEx(key); + + try { + if (e != null) + e.clear(obsoleteVer, readers, null); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to clear entry (will continue to clear other entries): " + e, + ex); + } + } + } + + /** + * Clears entry from cache. + * + * @param obsoleteVer Obsolete version to set. + * @param key Key to clear. + * @param filter Optional filter. + * @return {@code True} if cleared. + */ + private boolean clear(GridCacheVersion obsoleteVer, K key, + @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) { + try { + if (ctx.portableEnabled()) + key = (K)ctx.marshalToPortable(key); + + GridCacheEntryEx<K, V> e = peekEx(key); + + return e != null && e.clear(obsoleteVer, false, filter); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to clear entry for key: " + key, ex); + } + + return false; + } + + /** {@inheritDoc} */ + @Override public void globalClearAll() throws IgniteCheckedException { + globalClearAll(0); + } + + /** {@inheritDoc} */ + @Override public void globalClearAll(long timeout) throws IgniteCheckedException { + try { + // Send job to remote nodes only. + Collection<ClusterNode> nodes = ctx.grid().forCache(name()).forRemotes().nodes(); + + IgniteFuture<Object> fut = null; + + if (!nodes.isEmpty()) { + ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); + + fut = ctx.closures().callAsyncNoFailover(BROADCAST, new GlobalClearAllCallable(name()), nodes, true); + } + + // Clear local cache synchronously. + clearAll(); + + if (fut != null) + fut.get(); + } + catch (ClusterGroupEmptyException ignore) { + if (log.isDebugEnabled()) + log.debug("All remote nodes left while cache clear [cacheName=" + name() + "]"); + } + catch (ComputeTaskTimeoutException e) { + U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " + + "'networkTimeout' configuration property) [cacheName=" + name() + "]"); + + throw e; + } + } + + /** {@inheritDoc} */ + @Override public boolean compact(K key) throws IgniteCheckedException { + return compact(key, (IgnitePredicate<CacheEntry<K, V>>[])null); + } + + /** {@inheritDoc} */ + @Override public void compactAll() throws IgniteCheckedException { + compactAll(keySet()); + } + + /** + * @param entry Removes entry from cache if currently mapped value is the same as passed. + */ + public void removeEntry(GridCacheEntryEx<K, V> entry) { + map.removeEntry(entry); + } + + /** + * Evicts an entry from cache. + * + * @param key Key. + * @param ver Version. + * @param filter Filter. + * @return {@code True} if entry was evicted. + */ + private boolean evictx(K key, GridCacheVersion ver, + @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) { + if (ctx.portableEnabled()) { + try { + key = (K)ctx.marshalToPortable(key); + } + catch (PortableException e) { + throw new IgniteException(e); + } + } + + GridCacheEntryEx<K, V> entry = peekEx(key); + + if (entry == null) + return true; + + try { + return ctx.evicts().evict(entry, ver, true, filter); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to evict entry from cache: " + entry, ex); + + return false; + } + } + + /** {@inheritDoc} */ + @Override public V get(K key, @Nullable GridCacheEntryEx<K, V> entry, boolean deserializePortable, + @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException { + String taskName = ctx.kernalContext().job().currentTaskName(); + + return getAllAsync(F.asList(key), !ctx.config().isReadFromBackup(), /*skip tx*/false, entry, null, taskName, + deserializePortable, filter).get().get(key); + } + + /** {@inheritDoc} */ + @Override public V getForcePrimary(K key) throws IgniteCheckedException { + ctx.denyOnFlag(LOCAL); + + String taskName = ctx.kernalContext().job().currentTaskName(); + + return getAllAsync(F.asList(key), /*force primary*/true, /*skip tx*/false, null, null, taskName, true) + .get().get(key); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> getForcePrimaryAsync(final K key) { + ctx.denyOnFlag(LOCAL); + + String taskName = ctx.kernalContext().job().currentTaskName(); + + return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null, + taskName, true).chain(new CX1<IgniteFuture<Map<K, V>>, V>() { - @Override public V applyx(IgniteFuture<Map<K, V>> e) throws IgniteCheckedException { ++ @Override ++ public V applyx(IgniteFuture<Map<K, V>> e) throws IgniteCheckedException { + return e.get().get(key); + } + }); + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<K, V> getAllOutTx(List<K> keys) throws IgniteCheckedException { + String taskName = ctx.kernalContext().job().currentTaskName(); + + return getAllAsync(keys, !ctx.config().isReadFromBackup(), /*skip tx*/true, null, null, taskName, true).get(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Map<K, V>> getAllOutTxAsync(List<K> keys) { + String taskName = ctx.kernalContext().job().currentTaskName(); + + return getAllAsync(keys, !ctx.config().isReadFromBackup(), /*skip tx*/true, null, null, taskName, true); + } + + /** {@inheritDoc} */ + @Nullable @Override public V reload(K key) throws IgniteCheckedException { - return reload(key, (IgnitePredicate<CacheEntry<K, V>>[])null); ++ return reload(key, (IgnitePredicate<CacheEntry<K, V>>[]) null); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> reloadAsync(K key) { + return reloadAsync(key, (IgnitePredicate<CacheEntry<K, V>>[])null); + } + + /** {@inheritDoc} */ + @Override public void reloadAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException { + reloadAll(keys, (IgnitePredicate<CacheEntry<K, V>>[])null); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> reloadAllAsync(@Nullable Collection<? extends K> keys) { + return reloadAllAsync(keys, (IgnitePredicate<CacheEntry<K, V>>[])null); + } + + /** {@inheritDoc} */ + @Override public void reloadAll() throws IgniteCheckedException { + ctx.denyOnFlags(F.asList(LOCAL, READ)); + + reloadAll(keySet(), (IgnitePredicate<CacheEntry<K, V>>[])null); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> reloadAllAsync() { + ctx.denyOnFlags(F.asList(LOCAL, READ)); + - return reloadAllAsync(keySet(), (IgnitePredicate<CacheEntry<K, V>>[])null); ++ return reloadAllAsync(keySet(), (IgnitePredicate<CacheEntry<K, V>>[]) null); + } + + /** + * @param keys Keys. + * @param reload Reload flag. + * @param tx Transaction. + * @param filter Filter. + * @param subjId Subject ID. + * @param taskName Task name. + * @param vis Visitor. + * @return Future. + */ + public IgniteFuture<Object> readThroughAllAsync(final Collection<? extends K> keys, + boolean reload, + @Nullable final IgniteTxEx<K, V> tx, + IgnitePredicate<CacheEntry<K, V>>[] filter, + @Nullable UUID subjId, + String taskName, + final IgniteBiInClosure<K, V> vis) { + return ctx.closures().callLocalSafe(new GPC<Object>() { - @Nullable @Override public Object call() { ++ @Nullable ++ @Override ++ public Object call() { + try { + ctx.store().loadAllFromStore(tx, keys, vis); - } - catch (IgniteCheckedException e) { ++ } catch (IgniteCheckedException e) { + throw new GridClosureException(e); + } + + return null; + } + }, true); + } + + /** + * @param keys Keys. + * @param ret Return flag. + * @param filter Optional filter. + * @return Non-{@code null} map if return flag is {@code true}. + * @throws IgniteCheckedException If failed. + */ + @Nullable public Map<K, V> reloadAll(@Nullable Collection<? extends K> keys, boolean ret, + @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException { + UUID subjId = ctx.subjectIdPerCall(null); + + String taskName = ctx.kernalContext().job().currentTaskName(); + + return reloadAllAsync(keys, ret, subjId, taskName, filter).get(); + } + + /** + * @param keys Keys. + * @param ret Return flag. + * @param filter Filter. + * @return Future. + */ + public IgniteFuture<Map<K, V>> reloadAllAsync(@Nullable Collection<? extends K> keys, boolean ret, + @Nullable UUID subjId, String taskName, @Nullable final IgnitePredicate<CacheEntry<K, V>>... filter) { + ctx.denyOnFlag(READ); + + final long topVer = ctx.affinity().affinityTopologyVersion(); + + if (!F.isEmpty(keys)) { + try { + final String uid = CU.uuid(); // Get meta UUID for this thread. + + assert keys != null; + + if (keyCheck) + validateCacheKeys(keys); + + for (K key : keys) { + if (key == null) + continue; + + // Skip primary or backup entries for near cache. + if (ctx.isNear() && ctx.affinity().localNode(key, topVer)) + continue; + + while (true) { + try { + GridCacheEntryEx<K, V> entry = entryExSafe(key, topVer); + + if (entry == null) + break; + + // Get version before checking filer. + GridCacheVersion ver = entry.version(); + + if (ctx.isAll(entry, filter)) + // Tag entry with current version. + entry.addMeta(uid, ver); + else + ctx.evicts().touch(entry, topVer); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry for reload (will retry): " + key); + } + catch (GridDhtInvalidPartitionException ignore) { + if (log.isDebugEnabled()) + log.debug("Got invalid partition for key (will skip): " + key); + + break; + } + } + } + + final Map<K, V> map = ret ? new HashMap<K, V>(keys.size(), 1.0f) : null; + + final Collection<? extends K> absentKeys = F.view(keys, CU.keyHasMeta(ctx, uid)); + + final Collection<K> loadedKeys = new GridConcurrentHashSet<>(); + + IgniteFuture<Object> readFut = + readThroughAllAsync(absentKeys, true, null, filter, subjId, taskName, new CI2<K, V>() { + /** Version for all loaded entries. */ + private GridCacheVersion nextVer = ctx.versions().next(); + + /** {@inheritDoc} */ + @Override public void apply(K key, V val) { + loadedKeys.add(key); + + GridCacheEntryEx<K, V> entry = peekEx(key); + + if (entry != null) { + try { + GridCacheVersion curVer = entry.removeMeta(uid); + + // If entry passed the filter. + if (curVer != null) { + boolean wasNew = entry.isNewLocked(); + + entry.unswap(); + + boolean set = entry.versionedValue(val, curVer, nextVer); + + ctx.evicts().touch(entry, topVer); + + if (map != null) { + if (set || wasNew) + map.put(key, val); + else { + try { + GridTuple<V> v = peek0(false, key, GLOBAL, filter); + + if (v != null) + map.put(key, val); + } + catch (GridCacheFilterFailedException ex) { + ex.printStackTrace(); + + assert false; + } + } + } + + if (log.isDebugEnabled()) { + log.debug("Set value loaded from store into entry [set=" + set + ", " + + "curVer=" + + curVer + ", newVer=" + nextVer + ", entry=" + entry + ']'); + } + } + else { + if (log.isDebugEnabled()) { + log.debug("Current version was not found (either entry was removed or " + + "validation was not passed: " + entry); + } + } + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) { + log.debug("Got removed entry for reload (will not store reloaded entry) " + + "[entry=" + entry + ']'); + } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } + }); + + return readFut.chain(new CX1<IgniteFuture<Object>, Map<K, V>>() { + @Override public Map<K, V> applyx(IgniteFuture<Object> e) throws IgniteCheckedException { + // Touch all not loaded keys. + for (K key : absentKeys) { + if (!loadedKeys.contains(key)) { + GridCacheEntryEx<K, V> entry = peekEx(key); + + if (entry != null) + ctx.evicts().touch(entry, topVer); + } + } + + // Make sure there were no exceptions. + e.get(); + + return map; + } + }); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx.kernalContext(), e); + } + } + + return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); + } + + /** + * @param key Key. + * @return Entry. + */ + @Nullable protected GridCacheEntryEx<K, V> entryExSafe(K key, long topVer) { + return entryEx(key); + } + + /** {@inheritDoc} */ + @Override public boolean evict(K key) { + return evict(key, (IgnitePredicate<CacheEntry<K, V>>[])null); + } + + /** {@inheritDoc} */ + @Override public void evictAll() { + evictAll(keySet()); + } + + /** {@inheritDoc} */ + @Override public void evictAll(@Nullable Collection<? extends K> keys) { + evictAll(keys, (IgnitePredicate<CacheEntry<K, V>>[])null); + } + + /** {@inheritDoc} */ + @Nullable @Override public V get(K key) throws IgniteCheckedException { ++ boolean statsEnabled = ctx.config().isStatisticsEnabled(); ++ ++ long start = statsEnabled ? System.nanoTime() : 0L; ++ + V val = get(key, true, null); + + if (ctx.config().getInterceptor() != null) + val = (V)ctx.config().getInterceptor().onGet(key, val); + ++ if (statsEnabled) ++ metrics0().addGetTimeNanos(System.nanoTime() - start); ++ + return val; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAsync(final K key) { ++ final boolean statsEnabled = ctx.config().isStatisticsEnabled(); ++ ++ final long start = statsEnabled ? System.nanoTime() : 0L; ++ + IgniteFuture<V> fut = getAsync(key, true, null); + + if (ctx.config().getInterceptor() != null) - return fut.chain(new CX1<IgniteFuture<V>, V>() { - @Override public V applyx(IgniteFuture<V> f) throws IgniteCheckedException { - return (V)ctx.config().getInterceptor().onGet(key, f.get()); ++ fut = fut.chain(new CX1<IgniteFuture<V>, V>() { ++ @Override ++ public V applyx(IgniteFuture<V> f) throws IgniteCheckedException { ++ return (V) ctx.config().getInterceptor().onGet(key, f.get()); + } + }); + ++ if (statsEnabled) ++ fut.listenAsync(new UpdateGetTimeStatClosure<V>(metrics0(), start)); ++ + return fut; + } + + /** {@inheritDoc} */ + @Override public Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException { ++ boolean statsEnabled = ctx.config().isStatisticsEnabled(); ++ ++ long start = statsEnabled ? System.nanoTime() : 0L; ++ + Map<K, V> map = getAll(keys, true, null); + + if (ctx.config().getInterceptor() != null) + map = interceptGet(keys, map); + ++ if (statsEnabled) ++ metrics0().addGetTimeNanos(System.nanoTime() - start); ++ + return map; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys) { ++ final boolean statsEnabled = ctx.config().isStatisticsEnabled(); ++ ++ final long start = statsEnabled ? System.nanoTime() : 0L; ++ + IgniteFuture<Map<K, V>> fut = getAllAsync(keys, true, null); + + if (ctx.config().getInterceptor() != null) + return fut.chain(new CX1<IgniteFuture<Map<K, V>>, Map<K, V>>() { + @Override public Map<K, V> applyx(IgniteFuture<Map<K, V>> f) throws IgniteCheckedException { + return interceptGet(keys, f.get()); + } + }); + ++ if (statsEnabled) ++ fut.listenAsync(new UpdateGetTimeStatClosure<Map<K, V>>(metrics0(), start)); ++ + return fut; + } + + /** + * Applies cache interceptor on result of 'get' operation. + * + * @param keys All requested keys. + * @param map Result map. + * @return Map with values returned by cache interceptor.. + */ + @SuppressWarnings("IfMayBeConditional") + private Map<K, V> interceptGet(@Nullable Collection<? extends K> keys, Map<K, V> map) { + if (F.isEmpty(keys)) + return map; + + CacheInterceptor<K, V> interceptor = cacheCfg.getInterceptor(); + + assert interceptor != null; + + Map<K, V> res = U.newHashMap(keys.size()); + + for (Map.Entry<K, V> e : map.entrySet()) { + V val = interceptor.onGet(e.getKey(), e.getValue()); + + if (val != null) + res.put(e.getKey(), val); + } + + if (map.size() != keys.size()) { // Not all requested keys were in cache. + for (K key : keys) { + if (key != null) { + if (!map.containsKey(key)) { + V val = interceptor.onGet(key, null); + + if (val != null) + res.put(key, val); + } + } + } + } + + return res; + } + + /** {@inheritDoc} */ + protected IgniteFuture<Map<K, V>> getAllAsync( + @Nullable Collection<? extends K> keys, + boolean forcePrimary, + boolean skipTx, + @Nullable GridCacheEntryEx<K, V> entry, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable, + @Nullable IgnitePredicate<CacheEntry<K, V>>... filter + ) { + GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); + + subjId = ctx.subjectIdPerCall(subjId, prj); + + return getAllAsync(keys, + true, + entry, + !skipTx, + subjId, + taskName, + deserializePortable, + forcePrimary, + accessExpiryPolicy(prj != null ? prj.expiry() : null), + filter); + } + + /** {@inheritDoc} */ + public IgniteFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys, + boolean readThrough, + @Nullable GridCacheEntryEx<K, V> cached, + boolean checkTx, + @Nullable final UUID subjId, + final String taskName, + final boolean deserializePortable, + final boolean forcePrimary, + @Nullable IgniteCacheExpiryPolicy expiry, + @Nullable final IgnitePredicate<CacheEntry<K, V>>... filter + ) { + ctx.checkSecurity(GridSecurityPermission.CACHE_READ); + + ctx.denyOnFlag(LOCAL); + + // Entry must be passed for one key only. + assert cached == null || keys.size() == 1; + assert ctx.portableEnabled() || cached == null || F.first(keys).equals(cached.key()); + + if (F.isEmpty(keys)) + return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); + + if (keyCheck) + validateCacheKeys(keys); + + IgniteTxLocalAdapter<K, V> tx = null; + + if (checkTx) { + try { + checkJta(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx.kernalContext(), e); + } + + tx = ctx.tm().threadLocalTx(); + } + + if (tx == null || tx.implicit()) { + try { + assert keys != null; + + final long topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); + + final Map<K, V> map = new GridLeanMap<>(keys.size()); + + Map<K, GridCacheVersion> misses = null; + + for (K key : keys) { + // Ignore null keys. + if (key == null) + continue; + + while (true) { + GridCacheEntryEx<K, V> entry; + + if (cached != null) { + entry = cached; + + cached = null; + } + else + entry = entryEx(key); + + try { + V val = entry.innerGet(null, + ctx.isSwapOrOffheapEnabled(), + /*don't read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /*update-metrics*/true, + /*event*/true, + /*temporary*/false, + subjId, + null, + taskName, + filter, + expiry); + + GridCacheVersion ver = entry.version(); + + if (val == null) { + if (misses == null) + misses = new GridLeanMap<>(); + + misses.put(key, ver); + } + else { + val = ctx.cloneOnFlag(val); + + if (ctx.portableEnabled() && deserializePortable) + val = (V)ctx.unwrapPortableIfNeeded(val, false); + + map.put(key, val); + + if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)) + ctx.evicts().touch(entry, topVer); + + if (keys.size() == 1) + // Safe to return because no locks are required in READ_COMMITTED mode. + return new GridFinishedFuture<>(ctx.kernalContext(), map); + } + + break; + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in getAllAsync(..) method (will retry): " + key); + } + catch (GridCacheFilterFailedException ignore) { + if (log.isDebugEnabled()) + log.debug("Filter validation failed for entry: " + entry); + + if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)) + ctx.evicts().touch(entry, topVer); + + break; // While loop. + } + } + } + + if (misses != null && readThrough && ctx.readThrough()) { + final Map<K, GridCacheVersion> loadKeys = misses; + + final Collection<K> redos = new LinkedList<>(); + + final IgniteTxLocalAdapter<K, V> tx0 = tx; + + final Collection<K> loaded = new HashSet<>(); + + return new GridEmbeddedFuture<>( + ctx.kernalContext(), + ctx.closures().callLocalSafe(ctx.projectSafe(new GPC<Map<K, V>>() { + @Override public Map<K, V> call() throws Exception { + ctx.store().loadAllFromStore(null/*tx*/, loadKeys.keySet(), new CI2<K, V>() { + /** New version for all new entries. */ + private GridCacheVersion nextVer; + + @Override public void apply(K key, V val) { + GridCacheVersion ver = loadKeys.get(key); + + if (ver == null) { + if (log.isDebugEnabled()) + log.debug("Value from storage was never asked for [key=" + key + + ", val=" + val + ']'); + + return; + } + + // Initialize next version. + if (nextVer == null) + nextVer = ctx.versions().next(); + + loaded.add(key); + + while (true) { + GridCacheEntryEx<K, V> entry = entryEx(key); + + try { + boolean set = entry.versionedValue(val, ver, nextVer); + + if (log.isDebugEnabled()) + log.debug("Set value loaded from store into entry [set=" + set + + ", curVer=" + ver + ", newVer=" + nextVer + ", " + + "entry=" + entry + ']'); + + boolean touch = true; + + // Don't put key-value pair into result map if value is null. + if (val != null) { + if (set || F.isEmptyOrNulls(filter)) + map.put(key, ctx.cloneOnFlag(val)); + else { + touch = false; + + // Try again, so we can return consistent values. + redos.add(key); + } + } + + if (touch && (tx0 == null || (!tx0.implicit() && + tx0.isolation() == READ_COMMITTED))) + ctx.evicts().touch(entry, topVer); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during getAllAsync (will retry): " + + entry); + } + catch (IgniteCheckedException e) { + // Wrap errors (will be unwrapped). + throw new GridClosureException(e); + } + } + } + }); + + if (loaded.size() != loadKeys.size()) { + for (K key : loadKeys.keySet()) { + if (loaded.contains(key)) + continue; + + if (tx0 == null || (!tx0.implicit() && + tx0.isolation() == READ_COMMITTED)) { + GridCacheEntryEx<K, V> entry = peekEx(key); + + if (entry != null) + ctx.evicts().touch(entry, topVer); + } + } + } + + return map; + } + }), true), + new C2<Map<K, V>, Exception, IgniteFuture<Map<K, V>>>() { + @Override public IgniteFuture<Map<K, V>> apply(Map<K, V> map, Exception e) { + if (e != null) + return new GridFinishedFuture<>(ctx.kernalContext(), e); + + if (tx0 == null || (!tx0.implicit() && tx0.isolation() == READ_COMMITTED)) { + Collection<K> notFound = new HashSet<>(loadKeys.keySet()); + + notFound.removeAll(loaded); + + // Touch entries that were not found in store. + for (K key : notFound) { + GridCacheEntryEx<K, V> entry = peekEx(key); + + if (entry != null) + ctx.evicts().touch(entry, topVer); + } + } + + if (!redos.isEmpty()) + // Future recursion. + return getAllAsync(redos, forcePrimary, /*skip tx*/false, + /*entry*/null, subjId, taskName, deserializePortable, filter); + + // There were no misses. + return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, + V>emptyMap()); + } + }, + new C2<Map<K, V>, Exception, Map<K, V>>() { + @Override public Map<K, V> apply(Map<K, V> loaded, Exception e) { + if (e == null) + map.putAll(loaded); + + return map; + } + } + ); + } + else { + // If misses is not empty and store is disabled, we should touch missed entries. + if (misses != null) { + for (K key : misses.keySet()) { + GridCacheEntryEx<K, V> entry = peekEx(key); + + if (entry != null) + ctx.evicts().touch(entry, topVer); + } + } + } + + return new GridFinishedFuture<>(ctx.kernalContext(), map); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx.kernalContext(), e); + } + } + else { + final GridCacheEntryEx<K, V> cached0 = cached; + + return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { + @Override public IgniteFuture<Map<K, V>> op(IgniteTxLocalAdapter<K, V> tx) { + return ctx.wrapCloneMap(tx.getAllAsync(ctx, keys, cached0, deserializePortable, filter)); + } + }); + } + } + + /** {@inheritDoc} */ + @Override public V put(K key, V val, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) + throws IgniteCheckedException { + return put(key, val, null, -1, filter); + } + + /** {@inheritDoc} */ + @Nullable @Override public V put(final K key, final V val, @Nullable final GridCacheEntryEx<K, V> cached, + final long ttl, @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter) throws IgniteCheckedException { ++ boolean statsEnabled = ctx.config().isStatisticsEnabled(); ++ ++ long start = statsEnabled ? System.nanoTime() : 0L; ++ + A.notNull(key, "key", val, "val"); + + if (keyCheck) + validateCacheKey(key); + + validateCacheValue(val); + + ctx.denyOnLocalRead(); + - return ctx.cloneOnFlag(syncOp(new SyncOp<V>(true) { ++ V prevValue = ctx.cloneOnFlag(syncOp(new SyncOp<V>(true) { + @Override public V op(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException { + return tx.putAllAsync(ctx, F.t(key, val), true, cached, ttl, filter).get().value(); + } + + @Override public String toString() { + return "put [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']'; + } + })); ++ ++ if (statsEnabled) ++ metrics0().addPutAndGetTimeNanos(System.nanoTime() - start); ++ ++ return prevValue; + } + + /** {@inheritDoc} */ + @Override public boolean putx(final K key, final V val, @Nullable final GridCacheEntryEx<K, V> cached, + final long ttl, @Nullable final IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException { + A.notNull(key, "key", val, "val"); + + if (keyCheck) + validateCacheKey(key); + + validateCacheValue(val); + + ctx.denyOnLocalRead(); + + return syncOp(new SyncOp<Boolean>(true) { + @Override + public Boolean op(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException { + return tx.putAllAsync(ctx, F.t(key, val), false, cached, ttl, filter).get().success(); + } + + @Override + public String toString() { + return "put [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']'; + } + }); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> putAsync(K key, V val, + @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) { - return putAsync(key, val, null, -1, filter); ++ final boolean statsEnabled = ctx.config().isStatisticsEnabled(); ++ ++ final long start = statsEnabled ? System.nanoTime() : 0L; ++ ++ IgniteFuture<V> fut = putAsync(key, val, null, -1, filter); ++ ++ if (statsEnabled) ++ fut.listenAsync(new UpdatePutAndGetTimeStatClosure<V>(metrics0(), start)); ++ ++ return fut; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> putAsync(final K key, final V val, @Nullable final GridCacheEntryEx<K, V> entry, + final long ttl, @Nullable final IgnitePredicate<CacheEntry<K, V>>... filter) { + A.notNull(key, "key", val, "val"); + + if (keyCheck) + validateCacheKey(key); + + validateCacheValue(val); + + ctx.denyOnLocalRead(); + + return ctx.wrapClone(asyncOp(new AsyncOp<V>(key) { + @Override + public IgniteFuture<V> op(IgniteTxLocalAdapter<K, V> tx) { + return tx.putAllAsync(ctx, F.t(key, val), true, entry, ttl, filter) + .chain((IgniteClosure<IgniteFuture<GridCacheReturn<V>>, V>)RET2VAL); + } + + @Override + public String toString() { + return "putAsync [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']'; + } + })); + } + + /** {@inheritDoc} */ + @Override public boolean putx(final K key, final V val, + final IgnitePredicate<CacheEntry<K, V>>[] filter) throws IgniteCheckedException { ++ boolean statsEnabled = ctx.config().isStatisticsEnabled(); ++ ++ long start = statsEnabled ? System.nanoTime() : 0L; ++ + A.notNull(key, "key", val, "val"); + + if (keyCheck) + validateCacheKey(key); + + validateCacheValue(val); + + ctx.denyOnLocalRead(); + - return syncOp(new SyncOp<Boolean>(true) { ++ Boolean stored = syncOp(new SyncOp<Boolean>(true) { + @Override + public Boolean op(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException { + return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, filter).get().success(); + } + + @Override + public String toString() { + return "putx [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']'; + } + }); ++ ++ if (statsEnabled) ++ metrics0().addPutTimeNanos(System.nanoTime() - start); ++ ++ return stored; + } + + /** {@inheritDoc} */ + @Override public void putAllDr(final Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException { + if (F.isEmpty(drMap)) + return; + + ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); + + ctx.denyOnLocalRead(); + + syncOp(new SyncInOp(drMap.size() == 1) { + @Override public void inOp(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException { + tx.putAllDrAsync(ctx, drMap).get(); + } + + @Override public String toString() { + return "putAllDr [drMap=" + drMap + ']'; + } + }); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> putAllDrAsync(final Map<? extends K, GridCacheDrInfo<V>> drMap) + throws IgniteCheckedException { + if (F.isEmpty(drMap)) + return new GridFinishedFuture<Object>(ctx.kernalContext()); + + ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); + + ctx.denyOnLocalRead(); + + return asyncOp(new AsyncInOp(drMap.keySet()) { - @Override public IgniteFuture<?> inOp(IgniteTxLocalAdapter<K, V> tx) { ++ @Override ++ public IgniteFuture<?> inOp(IgniteTxLocalAdapter<K, V> tx) { + return tx.putAllDrAsync(ctx, drMap); + } + - @Override public String toString() { ++ @Override ++ public String toString() { + return "putAllDrAsync [drMap=" + drMap + ']'; + } + }); + } + + /** {@inheritDoc} */ + @Override public <T> EntryProcessorResult<T> invoke(final K key, + final EntryProcessor<K, V, T> entryProcessor, + final Object... args) + throws IgniteCheckedException { + A.notNull(key, "key", entryProcessor, "entryProcessor"); + + if (keyCheck) + validateCacheKey(key); + + ctx.denyOnLocalRead(); + + return syncOp(new SyncOp<EntryProcessorResult<T>>(true) { + @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter<K, V> tx) + throws IgniteCheckedException { + Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = - Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor); ++ Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor); + + IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut = + tx.invokeAsync(ctx, invokeMap, args); + + Map<K, EntryProcessorResult<T>> resMap = fut.get().value(); + + if (resMap != null) { + assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); + + return resMap.isEmpty() ? null : resMap.values().iterator().next(); + } + + return null; + } + }); + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(final Set<? extends K> keys, + final EntryProcessor<K, V, T> entryProcessor, + final Object... args) throws IgniteCheckedException { + A.notNull(keys, "keys", entryProcessor, "entryProcessor"); + + if (keyCheck) + validateCacheKeys(keys); + + ctx.denyOnLocalRead(); + + return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) { - @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) - throws IgniteCheckedException { ++ @Nullable ++ @Override ++ public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) ++ throws IgniteCheckedException { + Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { - @Override public EntryProcessor apply(K k) { ++ @Override ++ public EntryProcessor apply(K k) { + return entryProcessor; + } + }); + + IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut = - tx.invokeAsync(ctx, invokeMap, args); ++ tx.invokeAsync(ctx, invokeMap, args); + + return fut.get().value(); + } + }); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync( + final K key, + final EntryProcessor<K, V, T> entryProcessor, + final Object... args) + throws EntryProcessorException { + A.notNull(key, "key", entryProcessor, "entryProcessor"); + + if (keyCheck) + validateCacheKey(key); + + ctx.denyOnLocalRead(); + + IgniteFuture<?> fut = asyncOp(new AsyncInOp(key) { + @Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) { + Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = - Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor); ++ Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor); + + return tx.invokeAsync(ctx, invokeMap, args); + } + + @Override public String toString() { + return "invokeAsync [key=" + key + ", entryProcessor=" + entryProcessor + ']'; + } + }); + + IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut0 = + (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)fut; + + return fut0.chain(new CX1<IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>, EntryProcessorResult<T>>() { + @Override public EntryProcessorResult<T> applyx(IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut) + throws IgniteCheckedException { + GridCacheReturn<Map<K, EntryProcessorResult<T>>> ret = fut.get(); + + Map<K, EntryProcessorResult<T>> resMap = ret.value(); + + if (resMap != null) { + assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); + + return resMap.isEmpty() ? null : resMap.values().iterator().next(); + } + + return null; + } + }); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + final Set<? extends K> keys, + final EntryProcessor<K, V, T> entryProcessor, + final Object... args) { + A.notNull(keys, "keys", entryProcessor, "entryProcessor"); + + if (keyCheck) + validateCacheKeys(keys); + +
<TRUNCATED>