http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 0000000,0c01c6c..5312f18 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@@ -1,0 -1,818 +1,820 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.distributed.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.plugin.security.*; + import org.apache.ignite.transactions.*; + import org.apache.ignite.internal.processors.cache.distributed.dht.*; + import org.apache.ignite.internal.processors.cache.distributed.near.*; + import org.apache.ignite.internal.processors.cache.transactions.*; + import org.apache.ignite.internal.util.future.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.util.*; + + import static org.apache.ignite.cache.CacheFlag.*; + import static org.apache.ignite.cache.GridCachePeekMode.*; + + /** + * Colocated cache. + */ + public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapter<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Empty constructor required for {@link Externalizable} + */ + public GridDhtColocatedCache() { + // No-op. + } + + /** + * @param ctx Cache context. + */ + public GridDhtColocatedCache(GridCacheContext<K, V> ctx) { + super(ctx); + } + + /** + * Creates colocated cache with specified map. + * + * @param ctx Cache context. + * @param map Cache map. + */ + public GridDhtColocatedCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, V> map) { + super(ctx, map); + } + + /** {@inheritDoc} */ + @Override public boolean isColocated() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void init() { + map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { + /** {@inheritDoc} */ + @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, + V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + return new GridDhtColocatedCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId); + } + }); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + super.start(); + + ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse<K, V>>() { + @Override public void apply(UUID nodeId, GridNearGetResponse<K, V> res) { + processGetResponse(nodeId, res); + } + }); + + ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse<K, V>>() { + @Override public void apply(UUID nodeId, GridNearLockResponse<K, V> res) { + processLockResponse(nodeId, res); + } + }); + } + + /** + * Gets or creates entry for given key and given topology version. + * + * @param key Key for entry. + * @param topVer Topology version. + * @param allowDetached Whether to allow detached entries. If {@code true} and node is not primary + * for given key, a new detached entry will be created. Otherwise, entry will be obtained from + * dht cache map. + * @return Cache entry. + * @throws GridDhtInvalidPartitionException If {@code allowDetached} is false and node is not primary + * for given key. + */ + public GridDistributedCacheEntry<K, V> entryExx(K key, long topVer, boolean allowDetached) { + return allowDetached && !ctx.affinity().primary(ctx.localNode(), key, topVer) ? + new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0) : entryExx(key, topVer); + } + + /** {@inheritDoc} */ + @Override public V peek(K key, @Nullable Collection<GridCachePeekMode> modes) throws IgniteCheckedException { + GridTuple<V> val = null; + + if (!modes.contains(NEAR_ONLY)) { + try { + val = peek0(true, key, modes, ctx.tm().txx()); + } + catch (GridCacheFilterFailedException ignored) { + if (log.isDebugEnabled()) + log.debug("Filter validation failed for key: " + key); + + return null; + } + } + + return val != null ? val.get() : null; + } + + /** {@inheritDoc} */ + @Override public boolean isLocked(K key) { + return ctx.mvcc().isLockedByThread(key, -1); + } + + /** {@inheritDoc} */ + @Override public boolean isLockedByThread(K key) { + return ctx.mvcc().isLockedByThread(key, Thread.currentThread().getId()); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Map<K, V>> getAllAsync( + @Nullable final Collection<? extends K> keys, + boolean forcePrimary, + boolean skipTx, + @Nullable final GridCacheEntryEx<K, V> entry, + @Nullable UUID subjId, + String taskName, + final boolean deserializePortable, + @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter + ) { + ctx.denyOnFlag(LOCAL); + ctx.checkSecurity(GridSecurityPermission.CACHE_READ); + + if (F.isEmpty(keys)) + return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); + + IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(); + + if (tx != null && !tx.implicit() && !skipTx) { + return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { + @Override public IgniteFuture<Map<K, V>> op(IgniteTxLocalAdapter<K, V> tx) { + return ctx.wrapCloneMap(tx.getAllAsync(ctx, keys, entry, deserializePortable, filter)); + } + }); + } + + long topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); + + GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); + + subjId = ctx.subjectIdPerCall(subjId, prj); + + return loadAsync(keys, + true, + false, + forcePrimary, + topVer, + subjId, + taskName, + deserializePortable, + filter, + accessExpiryPolicy(prj != null ? prj.expiry() : null)); + } + + /** {@inheritDoc} */ + @Override protected GridCacheEntryEx<K, V> entryExSafe(K key, long topVer) { + try { + return ctx.affinity().localNode(key, topVer) ? entryEx(key) : null; + } + catch (GridDhtInvalidPartitionException ignored) { + return null; + } + } + + /** {@inheritDoc} */ + @Override public boolean containsKey(K key, @Nullable IgnitePredicate<CacheEntry<K, V>> filter) { + A.notNull(key, "key"); + + // We need detached entry here because if there is an ongoing transaction, + // we should see this entry and apply filter. + GridCacheEntryEx<K, V> e = entryExx(key, ctx.affinity().affinityTopologyVersion(), true, true); + + try { + return e != null && e.peek(SMART, filter) != null; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during peek (will ignore): " + e); + + return false; + } + } + + /** + * @param keys Keys to load. + * @param readThrough Read through flag. + * @param reload Reload flag. + * @param forcePrimary Force get from primary node flag. + * @param topVer Topology version. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. + * @param filter Filter. + * @param expiryPlc Expiry policy. + * @return Loaded values. + */ + public IgniteFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys, + boolean readThrough, + boolean reload, + boolean forcePrimary, + long topVer, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable, + @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter, + @Nullable IgniteCacheExpiryPolicy expiryPlc) { + if (keys == null || keys.isEmpty()) + return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); + + if (keyCheck) + validateCacheKeys(keys); + + if (expiryPlc == null) + expiryPlc = accessExpiryPolicy(ctx.expiry()); + + // Optimisation: try to resolve value locally and escape 'get future' creation. + if (!reload && !forcePrimary) { + Map<K, V> locVals = new HashMap<>(keys.size(), 1.0f); + + boolean success = true; + + // Optimistically expect that all keys are available locally (avoid creation of get future). + for (K key : keys) { + GridCacheEntryEx<K, V> entry = null; + + while (true) { + try { + entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + V v = entry.innerGet(null, + /*swap*/true, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, - /**update-metrics*/true, ++ /**update-metrics*/false, + /*event*/true, + /*temporary*/false, + subjId, + null, + taskName, + filter, + expiryPlc); + + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + GridCacheVersion obsoleteVer = context().versions().next(); + + if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) + removeIfObsolete(key); + + success = false; + } + else { + if (ctx.portableEnabled()) + v = (V)ctx.unwrapPortableIfNeeded(v, !deserializePortable); + + locVals.put(key, v); + } + } + else + success = false; + + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, retry. + } + catch (GridCacheFilterFailedException ignored) { + // No-op, skip the key. + break; + } + catch (GridDhtInvalidPartitionException ignored) { + success = false; + + break; // While. + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx.kernalContext(), e); + } + finally { + if (entry != null) + context().evicts().touch(entry, topVer); + } + } + + if (!success) + break; ++ else ++ ctx.cache().metrics0().onRead(true); + } + + if (success) { + sendTtlUpdateRequest(expiryPlc); + + return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals)); + } + } + + if (expiryPlc != null) + expiryPlc.reset(); + + // Either reload or not all values are available locally. + GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, + keys, + topVer, + readThrough, + reload, + forcePrimary, + filter, + subjId, + taskName, + deserializePortable, + expiryPlc); + + fut.init(); + + return ctx.wrapCloneMap(fut); + } + + /** + * This is an entry point to pessimistic locking within transaction. + * + * {@inheritDoc} + */ + @Override public IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, + long timeout, + @Nullable IgniteTxLocalEx<K, V> tx, + boolean isInvalidate, + boolean isRead, + boolean retval, + @Nullable IgniteTxIsolation isolation, + long accessTtl, + IgnitePredicate<CacheEntry<K, V>>[] filter) { + assert tx == null || tx instanceof GridNearTxLocal; + + GridNearTxLocal<K, V> txx = (GridNearTxLocal<K, V>)tx; + + GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx, + keys, + txx, + isRead, + retval, + timeout, + accessTtl, + filter); + + // Future will be added to mvcc only if it was mapped to remote nodes. + fut.map(); + + return fut; + } + + /** {@inheritDoc} */ + @Override public GridNearTransactionalCache<K, V> near() { + assert false : "Near cache is not available in colocated mode."; + + return null; + } + + /** {@inheritDoc} */ + @Override public CacheEntry<K, V> entry(K key) throws GridDhtInvalidPartitionException { + return new GridDhtCacheEntryImpl<>(ctx.projectionPerCall(), ctx, key, null); + } + + /** {@inheritDoc} */ + @Override public void unlockAll(Collection<? extends K> keys, + IgnitePredicate<CacheEntry<K, V>>[] filter) { + if (keys.isEmpty()) + return; + + try { + GridCacheVersion ver = null; + + int keyCnt = -1; + + Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null; + + Collection<K> locKeys = new LinkedList<>(); + + for (K key : keys) { + GridDistributedCacheEntry<K, V> entry = peekExx(key); + + CacheEntry<K, V> cacheEntry = entry == null ? entry(key) : entry.wrap(false); + + if (!ctx.isAll(cacheEntry, filter)) + break; // While. + + GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(Thread.currentThread().getId(), key, null); + + if (lock != null) { + final long topVer = lock.topologyVersion(); + + assert topVer > 0; + + if (map == null) { + Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer); + + keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size()); + + map = U.newHashMap(affNodes.size()); + } + + if (ver == null) + ver = lock.version(); + + // Send request to remove from remote nodes. + ClusterNode primary = ctx.affinity().primary(key, topVer); + + if (!lock.reentry()) { + if (!ver.equals(lock.version())) + throw new IgniteCheckedException("Failed to unlock (if keys were locked separately, " + + "then they need to be unlocked separately): " + keys); + + if (!primary.isLocal()) { + GridNearUnlockRequest<K, V> req = map.get(primary); + + if (req == null) { + map.put(primary, req = new GridNearUnlockRequest<>(ctx.cacheId(), keyCnt)); + + req.version(ver); + } + + byte[] keyBytes = entry != null ? entry.getOrMarshalKeyBytes() : CU.marshal(ctx.shared(), key); + + req.addKey(key, keyBytes, ctx); + } + else + locKeys.add(key); + + if (log.isDebugEnabled()) + log.debug("Removed lock (will distribute): " + lock); + } + else if (log.isDebugEnabled()) + log.debug("Current thread still owns lock (or there are no other nodes)" + + " [lock=" + lock + ", curThreadId=" + Thread.currentThread().getId() + ']'); + } + } + + if (ver == null) + return; + + if (!locKeys.isEmpty()) + removeLocks(ctx.localNodeId(), ver, locKeys, true); + + for (Map.Entry<ClusterNode, GridNearUnlockRequest<K, V>> mapping : map.entrySet()) { + ClusterNode n = mapping.getKey(); + + GridDistributedUnlockRequest<K, V> req = mapping.getValue(); + + assert !n.isLocal(); + + if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys())) + // We don't wait for reply to this message. + ctx.io().send(n, req); + } + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to unlock the lock for keys: " + keys, ex); + } + } + + /** + * Removes locks regardless of whether they are owned or not for given + * version and keys. + * + * @param threadId Thread ID. + * @param ver Lock version. + * @param keys Keys. + */ + public void removeLocks(long threadId, GridCacheVersion ver, Collection<? extends K> keys) { + if (keys.isEmpty()) + return; + + try { + int keyCnt = -1; + + Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null; + + Collection<K> locKeys = new LinkedList<>(); + + for (K key : keys) { + GridCacheMvccCandidate<K> lock = ctx.mvcc().removeExplicitLock(threadId, key, ver); + + if (lock != null) { + long topVer = lock.topologyVersion(); + + if (map == null) { + Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer); + + keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size()); + + map = U.newHashMap(affNodes.size()); + } + + ClusterNode primary = ctx.affinity().primary(key, topVer); + + if (!primary.isLocal()) { + // Send request to remove from remote nodes. + GridNearUnlockRequest<K, V> req = map.get(primary); + + if (req == null) { + map.put(primary, req = new GridNearUnlockRequest<>(ctx.cacheId(), keyCnt)); + + req.version(ver); + } + + GridCacheEntryEx<K, V> entry = peekEx(key); + + byte[] keyBytes = entry != null ? entry.getOrMarshalKeyBytes() : CU.marshal(ctx.shared(), key); + + req.addKey(key, keyBytes, ctx); + } + else + locKeys.add(key); + } + } + + if (!locKeys.isEmpty()) + removeLocks(ctx.localNodeId(), ver, locKeys, true); + + if (map == null || map.isEmpty()) + return; + + Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver); + Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver); + + for (Map.Entry<ClusterNode, GridNearUnlockRequest<K, V>> mapping : map.entrySet()) { + ClusterNode n = mapping.getKey(); + + GridDistributedUnlockRequest<K, V> req = mapping.getValue(); + + if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys())) { + req.completedVersions(committed, rolledback); + + // We don't wait for reply to this message. + ctx.io().send(n, req); + } + } + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to unlock the lock for keys: " + keys, ex); + } + } + + /** + * @param cacheCtx Cache context. + * @param tx Started colocated transaction (if any). + * @param threadId Thread ID. + * @param ver Lock version. + * @param topVer Topology version. + * @param keys Mapped keys. + * @param txRead Tx read. + * @param timeout Lock timeout. + * @param accessTtl TTL for read operation. + * @param filter filter Optional filter. + * @return Lock future. + */ + IgniteFuture<Exception> lockAllAsync( + final GridCacheContext<K, V> cacheCtx, + @Nullable final GridNearTxLocal<K, V> tx, + final long threadId, + final GridCacheVersion ver, + final long topVer, + final Collection<K> keys, + final boolean txRead, + final long timeout, + final long accessTtl, + @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter + ) { + assert keys != null; + + IgniteFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); + + // Prevent embedded future creation if possible. + if (keyFut.isDone()) { + try { + // Check for exception. + keyFut.get(); + + return lockAllAsync0(cacheCtx, + tx, + threadId, + ver, + topVer, + keys, + txRead, + timeout, + accessTtl, + filter); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx.kernalContext(), e); + } + } + else { + return new GridEmbeddedFuture<>(true, keyFut, + new C2<Object, Exception, IgniteFuture<Exception>>() { + @Override public IgniteFuture<Exception> apply(Object o, Exception exx) { + if (exx != null) + return new GridDhtFinishedFuture<>(ctx.kernalContext(), exx); + + return lockAllAsync0(cacheCtx, + tx, + threadId, + ver, + topVer, + keys, + txRead, + timeout, + accessTtl, + filter); + } + }, + ctx.kernalContext()); + } + } + + /** + * @param cacheCtx Cache context. + * @param tx Started colocated transaction (if any). + * @param threadId Thread ID. + * @param ver Lock version. + * @param topVer Topology version. + * @param keys Mapped keys. + * @param txRead Tx read. + * @param timeout Lock timeout. + * @param accessTtl TTL for read operation. + * @param filter filter Optional filter. + * @return Lock future. + */ + private IgniteFuture<Exception> lockAllAsync0( + GridCacheContext<K, V> cacheCtx, + @Nullable final GridNearTxLocal<K, V> tx, + long threadId, + final GridCacheVersion ver, + final long topVer, + final Collection<K> keys, + final boolean txRead, + final long timeout, + final long accessTtl, + @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter) { + int cnt = keys.size(); + + if (tx == null) { + GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx, + ctx.localNodeId(), + ver, + topVer, + cnt, + txRead, + timeout, + tx, + threadId, + accessTtl, + filter); + + // Add before mapping. + if (!ctx.mvcc().addFuture(fut)) + throw new IllegalStateException("Duplicate future ID: " + fut); + + boolean timedout = false; + + for (K key : keys) { + if (timedout) + break; + + while (true) { + GridDhtCacheEntry<K, V> entry = entryExx(key, topVer); + + try { + fut.addEntry(key == null ? null : entry); + + if (fut.isDone()) + timedout = true; + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry when adding lock (will retry): " + entry); + } + catch (GridDistributedLockCancelledException e) { + if (log.isDebugEnabled()) + log.debug("Got lock request for cancelled lock (will ignore): " + + entry); + + fut.onError(e); + + return new GridDhtFinishedFuture<>(ctx.kernalContext(), e); + } + } + } + + // This will send remote messages. + fut.map(); + + return new GridDhtEmbeddedFuture<>( + ctx.kernalContext(), + fut, + new C2<Boolean, Exception, Exception>() { + @Override public Exception apply(Boolean b, Exception e) { + if (e != null) + e = U.unwrap(e); + else if (!b) + e = new GridCacheLockTimeoutException(ver); + + return e; + } + }); + } + else { + // Handle implicit locks for pessimistic transactions. + ctx.tm().txContext(tx); + + if (log.isDebugEnabled()) + log.debug("Performing colocated lock [tx=" + tx + ", keys=" + keys + ']'); + + IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx, + keys, + tx.implicit(), + txRead, + accessTtl); + + return new GridDhtEmbeddedFuture<>( + ctx.kernalContext(), + txFut, + new C2<GridCacheReturn<V>, Exception, Exception>() { + @Override public Exception apply(GridCacheReturn<V> ret, + Exception e) { + if (e != null) + e = U.unwrap(e); + + assert !tx.empty(); + + return e; + } + }); + } + } + + /** + * @param nodeId Sender ID. + * @param res Response. + */ + private void processGetResponse(UUID nodeId, GridNearGetResponse<K, V> res) { + GridPartitionedGetFuture<K, V> fut = (GridPartitionedGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future( + res.version(), res.futureId()); + + if (fut == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']'); + + return; + } + + fut.onResult(nodeId, res); + } + + /** + * @param nodeId Node ID. + * @param res Response. + */ + private void processLockResponse(UUID nodeId, GridNearLockResponse<K, V> res) { + assert nodeId != null; + assert res != null; + + GridDhtColocatedLockFuture<K, V> fut = (GridDhtColocatedLockFuture<K, V>)ctx.mvcc(). + <Boolean>future(res.version(), res.futureId()); + + if (fut != null) + fut.onResult(nodeId, res); + } + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 0000000,25718fc..05dda8a mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@@ -1,0 -1,792 +1,785 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.near; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.distributed.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.processors.cache.distributed.dht.*; + import org.apache.ignite.internal.processors.cache.transactions.*; + import org.apache.ignite.internal.util.future.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import javax.cache.expiry.*; + import java.io.*; + import java.util.*; + + import static org.apache.ignite.cache.CacheFlag.*; + import static org.apache.ignite.cache.GridCachePeekMode.*; + import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*; + + /** + * Common logic for near caches. + */ + public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAdapter<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Empty constructor required for {@link Externalizable}. + */ + protected GridNearCacheAdapter() { + // No-op. + } + + /** + * @param ctx Context. + */ + protected GridNearCacheAdapter(GridCacheContext<K, V> ctx) { + super(ctx, ctx.config().getNearStartSize()); + } + + /** {@inheritDoc} */ + @Override protected void init() { + map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { + /** {@inheritDoc} */ + @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, + V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + // Can't hold any locks here - this method is invoked when + // holding write-lock on the whole cache map. + return new GridNearCacheEntry<>(ctx, key, hash, val, next, ttl, hdrId); + } + }); + } + + /** + * @return DHT cache. + */ + public abstract GridDhtCacheAdapter<K, V> dht(); + + /** {@inheritDoc} */ + @Override public boolean isNear() { + return true; + } + + /** {@inheritDoc} */ + @Override public GridCachePreloader<K, V> preloader() { + return dht().preloader(); + } + + /** {@inheritDoc} */ + @Override public GridCacheEntryEx<K, V> entryEx(K key, boolean touch) { + GridNearCacheEntry<K, V> entry = null; + + while (true) { + try { + entry = (GridNearCacheEntry<K, V>)super.entryEx(key, touch); + + entry.initializeFromDht(ctx.affinity().affinityTopologyVersion()); + + return entry; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed near entry while initializing from DHT entry (will retry): " + entry); + } + } + } + + /** {@inheritDoc} */ + @Override public GridCacheEntryEx<K, V> entryEx(K key, long topVer) { + GridNearCacheEntry<K, V> entry = null; + + while (true) { + try { + entry = (GridNearCacheEntry<K, V>)super.entryEx(key, topVer); + + entry.initializeFromDht(topVer); + + return entry; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed near entry while initializing from DHT entry (will retry): " + entry); + } + } + } + + /** + * @param key Key. + * @param topVer Topology version. + * @return Entry. + */ + public GridNearCacheEntry<K, V> entryExx(K key, long topVer) { + return (GridNearCacheEntry<K, V>)entryEx(key, topVer); + } + + /** + * @param key Key. + * @return Entry. + */ + @Nullable public GridNearCacheEntry<K, V> peekExx(K key) { + return (GridNearCacheEntry<K, V>)peekEx(key); + } + + /** {@inheritDoc} */ + @Override public boolean isLocked(K key) { + return super.isLocked(key) || dht().isLocked(key); + } + + /** + * @param key Key. + * @return If near entry is locked. + */ + public boolean isLockedNearOnly(K key) { + return super.isLocked(key); + } + + /** + * @param keys Keys. + * @return If near entries for given keys are locked. + */ + public boolean isAllLockedNearOnly(Iterable<? extends K> keys) { + A.notNull(keys, "keys"); + + for (K key : keys) + if (!isLockedNearOnly(key)) + return false; + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "RedundantCast"}) + @Override public IgniteFuture<Object> readThroughAllAsync(Collection<? extends K> keys, boolean reload, + IgniteTxEx<K, V> tx, IgnitePredicate<CacheEntry<K, V>>[] filter, @Nullable UUID subjId, String taskName, + IgniteBiInClosure<K, V> vis) { + return (IgniteFuture)loadAsync(tx, + keys, + reload, + false, + filter, + subjId, + taskName, + true, + null); + } + + /** {@inheritDoc} */ + @Override public void reloadAll(@Nullable Collection<? extends K> keys, + @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException { + dht().reloadAll(keys, filter); + + super.reloadAll(keys, filter); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public IgniteFuture<?> reloadAllAsync(@Nullable Collection<? extends K> keys, + @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + GridCompoundFuture fut = new GridCompoundFuture(ctx.kernalContext()); + + fut.add(super.reloadAllAsync(keys, filter)); + fut.add(dht().reloadAllAsync(keys, filter)); + + fut.markInitialized(); + + return fut; + + } + + /** {@inheritDoc} */ + @Override public V reload(K key, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) + throws IgniteCheckedException { + V val; + + try { + val = dht().reload(key, filter); + } + catch (GridDhtInvalidPartitionException ignored) { + return null; + } + + V nearVal = super.reload(key, filter); + + return val == null ? nearVal : val; + } + + /** {@inheritDoc} */ + @Override public void reloadAll() throws IgniteCheckedException { + super.reloadAll(); + + dht().reloadAll(); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override public IgniteFuture<?> reloadAllAsync() { + GridCompoundFuture fut = new GridCompoundFuture(ctx.kernalContext()); + + fut.add(super.reloadAllAsync()); + fut.add(dht().reloadAllAsync()); + + fut.markInitialized(); + + return fut; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override public IgniteFuture<?> reloadAllAsync(@Nullable IgnitePredicate<CacheEntry<K, V>> filter) { + GridCompoundFuture fut = new GridCompoundFuture(ctx.kernalContext()); + + fut.add(super.reloadAllAsync()); + fut.add(dht().reloadAllAsync(filter)); + + fut.markInitialized(); + + return fut; + } + + /** + * @param tx Transaction. + * @param keys Keys to load. + * @param reload Reload flag. + * @param forcePrimary Force primary flag. + * @param filter Filter. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. + * @return Loaded values. + */ + public IgniteFuture<Map<K, V>> loadAsync(@Nullable IgniteTxEx tx, + @Nullable Collection<? extends K> keys, + boolean reload, + boolean forcePrimary, + @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable, + @Nullable ExpiryPolicy expiryPlc) { + if (F.isEmpty(keys)) + return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); + + if (keyCheck) + validateCacheKeys(keys); + + IgniteTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (IgniteTxLocalEx<K, V>)tx : null; + + final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc); + + GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, + keys, + true, + reload, + forcePrimary, + txx, + filter, + subjId, + taskName, + deserializePortable, + expiry); + + // init() will register future for responses if future has remote mappings. + fut.init(); + + return ctx.wrapCloneMap(fut); + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiPredicate<K, V> p, long ttl, Object[] args) throws IgniteCheckedException { + dht().loadCache(p, ttl, args); + } + + /** {@inheritDoc} */ + @Override public void localLoad(Collection<? extends K> keys) throws IgniteCheckedException { + dht().localLoad(keys); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> loadCacheAsync(IgniteBiPredicate<K, V> p, long ttl, Object[] args) { + return dht().loadCacheAsync(p, ttl, args); + } + - /** {@inheritDoc} */ - @Override public void resetMetrics() { - super.resetMetrics(); - - dht().resetMetrics(); - } - + /** + * @param nodeId Sender ID. + * @param res Response. + */ + protected void processGetResponse(UUID nodeId, GridNearGetResponse<K, V> res) { + GridNearGetFuture<K, V> fut = (GridNearGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future( + res.version(), res.futureId()); + + if (fut == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']'); + + return; + } + + fut.onResult(nodeId, res); + } + + /** {@inheritDoc} */ + @Override public int size() { + return super.size() + dht().size(); + } + + /** {@inheritDoc} */ + @Override public int primarySize() { + return dht().primarySize(); + } + + /** {@inheritDoc} */ + @Override public int nearSize() { + return super.size(); + } + + /** + * @return Near entries. + */ + public Set<CacheEntry<K, V>> nearEntries() { + return super.entrySet(CU.<K, V>empty()); + } + + /** {@inheritDoc} */ + @Override public Set<CacheEntry<K, V>> entrySet( + @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + return new EntrySet(super.entrySet(filter), dht().entrySet(filter)); + } + + /** {@inheritDoc} */ + @Override public Set<CacheEntry<K, V>> entrySet(int part) { + return dht().entrySet(part); + } + + /** {@inheritDoc} */ + @Override public Set<CacheEntry<K, V>> primaryEntrySet( + @Nullable final IgnitePredicate<CacheEntry<K, V>>... filter) { + final long topVer = ctx.affinity().affinityTopologyVersion(); + + Collection<CacheEntry<K, V>> entries = + F.flatCollections( + F.viewReadOnly( + dht().topology().currentLocalPartitions(), + new C1<GridDhtLocalPartition<K, V>, Collection<CacheEntry<K, V>>>() { + @Override public Collection<CacheEntry<K, V>> apply(GridDhtLocalPartition<K, V> p) { + return F.viewReadOnly( + p.entries(), + new C1<GridDhtCacheEntry<K, V>, CacheEntry<K, V>>() { + @Override public CacheEntry<K, V> apply(GridDhtCacheEntry<K, V> e) { + return e.wrap(true); + } + }, + new P1<GridDhtCacheEntry<K, V>>() { + @Override public boolean apply(GridDhtCacheEntry<K, V> e) { + return !e.obsoleteOrDeleted(); + } + }); + } + }, + new P1<GridDhtLocalPartition<K, V>>() { + @Override public boolean apply(GridDhtLocalPartition<K, V> p) { + return p.primary(topVer); + } + })); + + return new GridCacheEntrySet<>(ctx, entries, filter); + } + + /** {@inheritDoc} */ + @Override public Set<K> keySet(@Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) { + return new GridCacheKeySet<>(ctx, entrySet(filter), null); + } + + /** + * @param filter Entry filter. + * @return Keys for near cache only. + */ + public Set<K> nearKeySet(@Nullable IgnitePredicate<CacheEntry<K, V>> filter) { + return super.keySet(filter); + } + + /** {@inheritDoc} */ + @Override public Set<K> primaryKeySet(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + return new GridCacheKeySet<>(ctx, primaryEntrySet(filter), null); + } + + /** {@inheritDoc} */ + @Override public Collection<V> values(IgnitePredicate<CacheEntry<K, V>>... filter) { + return new GridCacheValueCollection<>(ctx, entrySet(filter), ctx.vararg(F.<K, V>cacheHasPeekValue())); + } + + /** {@inheritDoc} */ + @Override public Collection<V> primaryValues(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + return new GridCacheValueCollection<>(ctx, entrySet(filter), ctx.vararg(F.<K, V>cachePrimary())); + } + + /** {@inheritDoc} */ + @Override public boolean containsKey(K key, IgnitePredicate<CacheEntry<K, V>> filter) { + return super.containsKey(key, filter) || dht().containsKey(key, filter); + } + + /** {@inheritDoc} */ + @Override public boolean evict(K key, @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) { + // Use unary 'and' to make sure that both sides execute. + return super.evict(key, filter) & dht().evict(key, filter); + } + + /** + * @param key Key to evict. + * @param filter Optional filter. + * @return {@code True} if evicted. + */ + public boolean evictNearOnly(K key, @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) { + return super.evict(key, filter); + } + + /** {@inheritDoc} */ + @Override public void evictAll(Collection<? extends K> keys, + @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) { + super.evictAll(keys, filter); + + dht().evictAll(keys, filter); + } + + /** {@inheritDoc} */ + @Override public boolean compact(K key, + @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) throws IgniteCheckedException { + return super.compact(key, filter) | dht().compact(key, filter); + } + + /** {@inheritDoc} */ + @Override public CacheEntry<K, V> entry(K key) { + // We don't try wrap entry from near or dht cache. + // Created object will be wrapped once some method is called. + return new GridPartitionedCacheEntryImpl<>(ctx.projectionPerCall(), ctx, key, null); + } + + /** + * Peeks only near cache without looking into DHT cache. + * + * @param key Key. + * @return Peeked value. + */ + @Nullable public V peekNearOnly(K key) { + try { + GridTuple<V> peek = peek0(true, key, SMART, CU.<K, V>empty()); + + return peek != null ? peek.get() : null; + } + catch (GridCacheFilterFailedException ignored) { + if (log.isDebugEnabled()) + log.debug("Filter validation failed for key: " + key); + + return null; + } + } + + /** {@inheritDoc} */ + @Override public V peek(K key, @Nullable IgnitePredicate<CacheEntry<K, V>> filter) { + try { + GridTuple<V> res = peek0(false, key, SMART, filter); + + if (res != null) + return res.get(); + } + catch (GridCacheFilterFailedException e) { + e.printStackTrace(); + + assert false : "Filter should not fail since fail-fast is false"; + } + + return dht().peek(key, filter); + } + + /** {@inheritDoc} */ + @Override public V peek(K key, @Nullable Collection<GridCachePeekMode> modes) throws IgniteCheckedException { + GridTuple<V> val = null; + + if (!modes.contains(PARTITIONED_ONLY)) { + try { + val = peek0(true, key, modes, ctx.tm().txx()); + } + catch (GridCacheFilterFailedException ignored) { + if (log.isDebugEnabled()) + log.debug("Filter validation failed for key: " + key); + + return null; + } + } + + if (val != null) + return val.get(); + + return !modes.contains(NEAR_ONLY) ? dht().peek(key, modes) : null; + } + + /** {@inheritDoc} */ + @Override public Map<K, V> peekAll(@Nullable Collection<? extends K> keys, + @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + final Map<K, V> resMap = super.peekAll(keys, filter); + + if (resMap.size() != keys.size()) + resMap.putAll(dht().peekAll(keys, F.and(filter, new IgnitePredicate<CacheEntry<K, V>>() { + @Override public boolean apply(CacheEntry<K, V> e) { + return !resMap.containsKey(e.getKey()); + } + }))); + + return resMap; + } + + /** {@inheritDoc} */ + @Override public boolean clear0(K key, @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) { + return super.clear0(key, filter) | dht().clear0(key, filter); + } + + /** {@inheritDoc} */ + @Override public void clearAll0(Collection<? extends K> keys, + @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) { + super.clearAll0(keys, filter); + + dht().clearAll0(keys, filter); + } + + /** {@inheritDoc} */ + @Override public V promote(K key, boolean deserializePortable) throws IgniteCheckedException { + ctx.denyOnFlags(F.asList(READ, SKIP_SWAP)); + + // Unswap only from dht(). Near cache does not have swap storage. + return dht().promote(key, deserializePortable); + } + + /** {@inheritDoc} */ + @Override public V promote(K key) throws IgniteCheckedException { + ctx.denyOnFlags(F.asList(READ, SKIP_SWAP)); + + // Unswap only from dht(). Near cache does not have swap storage. + return dht().promote(key); + } + + /** {@inheritDoc} */ + @Override public void promoteAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException { + ctx.denyOnFlags(F.asList(READ, SKIP_SWAP)); + + // Unswap only from dht(). Near cache does not have swap storage. + // In near-only cache this is a no-op. + if (isAffinityNode(ctx.config())) + dht().promoteAll(keys); + } + + /** {@inheritDoc} */ + @Override public Iterator<Map.Entry<K, V>> swapIterator() throws IgniteCheckedException { + ctx.denyOnFlags(F.asList(SKIP_SWAP)); + + return dht().swapIterator(); + } + + /** {@inheritDoc} */ + @Override public Iterator<Map.Entry<K, V>> offHeapIterator() throws IgniteCheckedException { + return dht().offHeapIterator(); + } + + /** {@inheritDoc} */ + @Override public long offHeapEntriesCount() { + return dht().offHeapEntriesCount(); + } + + /** {@inheritDoc} */ + @Override public long offHeapAllocatedSize() { + return dht().offHeapAllocatedSize(); + } + + /** {@inheritDoc} */ + @Override public long swapSize() throws IgniteCheckedException { + return dht().swapSize(); + } + + /** {@inheritDoc} */ + @Override public long swapKeys() throws IgniteCheckedException { + return dht().swapKeys(); + } + + /** {@inheritDoc} */ + @Override public boolean isGgfsDataCache() { + return dht().isGgfsDataCache(); + } + + /** {@inheritDoc} */ + @Override public long ggfsDataSpaceUsed() { + return dht().ggfsDataSpaceUsed(); + } + + /** {@inheritDoc} */ + @Override public long ggfsDataSpaceMax() { + return dht().ggfsDataSpaceMax(); + } + + /** {@inheritDoc} */ + @Override public void onGgfsDataSizeChanged(long delta) { + dht().onGgfsDataSizeChanged(delta); + } + + /** {@inheritDoc} */ + @Override public boolean isMongoDataCache() { + return dht().isMongoDataCache(); + } + + /** {@inheritDoc} */ + @Override public boolean isMongoMetaCache() { + return dht().isMongoMetaCache(); + } + + /** {@inheritDoc} */ + @Override public List<GridCacheClearAllRunnable<K, V>> splitClearAll() { + switch (configuration().getDistributionMode()) { + case NEAR_PARTITIONED: + GridCacheVersion obsoleteVer = ctx.versions().next(); + + List<GridCacheClearAllRunnable<K, V>> dhtJobs = dht().splitClearAll(); + + List<GridCacheClearAllRunnable<K, V>> res = new ArrayList<>(dhtJobs.size()); + + for (GridCacheClearAllRunnable<K, V> dhtJob : dhtJobs) + res.add(new GridNearCacheClearAllRunnable<>(this, obsoleteVer, dhtJob)); + + return res; + + case NEAR_ONLY: + return super.splitClearAll(); + + default: + assert false : "Invalid partition distribution mode."; + + return null; + } + } + + /** + * Wrapper for entry set. + */ + private class EntrySet extends AbstractSet<CacheEntry<K, V>> { + /** Near entry set. */ + private Set<CacheEntry<K, V>> nearSet; + + /** Dht entry set. */ + private Set<CacheEntry<K, V>> dhtSet; + + /** + * @param nearSet Near entry set. + * @param dhtSet Dht entry set. + */ + private EntrySet(Set<CacheEntry<K, V>> nearSet, Set<CacheEntry<K, V>> dhtSet) { + assert nearSet != null; + assert dhtSet != null; + + this.nearSet = nearSet; + this.dhtSet = dhtSet; + } + + /** {@inheritDoc} */ + @NotNull @Override public Iterator<CacheEntry<K, V>> iterator() { + return new EntryIterator(nearSet.iterator(), + F.iterator0(dhtSet, false, new P1<CacheEntry<K, V>>() { + @Override public boolean apply(CacheEntry<K, V> e) { + return !GridNearCacheAdapter.super.containsKey(e.getKey(), null); + } + })); + } + + /** {@inheritDoc} */ + @Override public int size() { + return F.size(iterator()); + } + } + + /** + * Entry set iterator. + */ + private class EntryIterator implements Iterator<CacheEntry<K, V>> { + /** */ + private Iterator<CacheEntry<K, V>> dhtIter; + + /** */ + private Iterator<CacheEntry<K, V>> nearIter; + + /** */ + private Iterator<CacheEntry<K, V>> currIter; + + /** */ + private CacheEntry<K, V> currEntry; + + /** + * @param nearIter Near set iterator. + * @param dhtIter Dht set iterator. + */ + private EntryIterator(Iterator<CacheEntry<K, V>> nearIter, Iterator<CacheEntry<K, V>> dhtIter) { + assert nearIter != null; + assert dhtIter != null; + + this.nearIter = nearIter; + this.dhtIter = dhtIter; + + currIter = nearIter; + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return nearIter.hasNext() || dhtIter.hasNext(); + } + + /** {@inheritDoc} */ + @Override public CacheEntry<K, V> next() { + if (!hasNext()) + throw new NoSuchElementException(); + + if (!currIter.hasNext()) + currIter = dhtIter; + + return currEntry = currIter.next(); + } + + /** {@inheritDoc} */ + @Override public void remove() { + if (currEntry == null) + throw new IllegalStateException(); + + assert currIter != null; + + currIter.remove(); + + try { + GridNearCacheAdapter.this.remove(currEntry.getKey(), CU.<K, V>empty()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearCacheAdapter.class, this); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 0000000,8628028..a8e5915 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@@ -1,0 -1,626 +1,627 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.near; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.distributed.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.processors.cache.distributed.dht.*; + import org.apache.ignite.internal.processors.cache.transactions.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import java.util.*; + + import static org.apache.ignite.events.IgniteEventType.*; + + /** + * Near cache entry. + */ + @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext", "TooBroadScope"}) + public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final int NEAR_SIZE_OVERHEAD = 36; + + /** ID of primary node from which this entry was last read. */ + private volatile UUID primaryNodeId; + + /** DHT version which caused the last update. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private GridCacheVersion dhtVer; + + /** Partition. */ + private int part; + + /** + * @param ctx Cache context. + * @param key Cache key. + * @param hash Key hash value. + * @param val Entry value. + * @param next Next entry in the linked list. + * @param ttl Time to live. + * @param hdrId Header id. + */ + public GridNearCacheEntry(GridCacheContext<K, V> ctx, K key, int hash, V val, GridCacheMapEntry<K, V> next, + long ttl, int hdrId) { + super(ctx, key, hash, val, next, ttl, hdrId); + + part = ctx.affinity().partition(key); + } + + /** {@inheritDoc} */ + @Override public int memorySize() throws IgniteCheckedException { + return super.memorySize() + NEAR_SIZE_OVERHEAD; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return part; + } + + /** {@inheritDoc} */ + @Override public boolean isNear() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean valid(long topVer) { + assert topVer > 0 : "Topology version is invalid: " + topVer; + + UUID primaryNodeId = this.primaryNodeId; + + if (primaryNodeId == null) + return false; + + if (cctx.discovery().node(primaryNodeId) == null) { + this.primaryNodeId = null; + + return false; + } + + // Make sure that primary node is alive before returning this value. + ClusterNode primary = cctx.affinity().primary(key(), topVer); + + if (primary != null && primary.id().equals(primaryNodeId)) + return true; + + // Primary node changed. + this.primaryNodeId = null; + + return false; + } + + /** + * @param topVer Topology version. + * @return {@code True} if this entry was initialized by this call. + * @throws GridCacheEntryRemovedException If this entry is obsolete. + */ + public boolean initializeFromDht(long topVer) throws GridCacheEntryRemovedException { + while (true) { + GridDhtCacheEntry<K, V> entry = cctx.near().dht().peekExx(key); + + if (entry != null) { + GridCacheEntryInfo<K, V> e = entry.info(); + + if (e != null) { + GridCacheVersion enqueueVer = null; + + try { + synchronized (this) { + checkObsolete(); + + if (isNew() || !valid(topVer)) { + // Version does not change for load ops. + update(e.value(), e.valueBytes(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version()); + + if (cctx.deferredDelete()) { + boolean deleted = val == null && valBytes == null; + + if (deleted != deletedUnlocked()) { + deletedUnlocked(deleted); + + if (deleted) + enqueueVer = e.version(); + } + } + + recordNodeId(cctx.affinity().primary(key, topVer).id()); + + dhtVer = e.isNew() || e.isDeleted() ? null : e.version(); + + return true; + } + + return false; + } + } + finally { + if (enqueueVer != null) + cctx.onDeferredDelete(this, enqueueVer); + } + } + } + else + return false; + } + } + + /** + * This method should be called only when lock is owned on this entry. + * + * @param val Value. + * @param valBytes Value bytes. + * @param ver Version. + * @param dhtVer DHT version. + * @param primaryNodeId Primary node ID. + * @return {@code True} if reset was done. + * @throws GridCacheEntryRemovedException If obsolete. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings( {"RedundantTypeArguments"}) + public boolean resetFromPrimary(V val, byte[] valBytes, GridCacheVersion ver, GridCacheVersion dhtVer, + UUID primaryNodeId) throws GridCacheEntryRemovedException, IgniteCheckedException { + assert dhtVer != null; + + cctx.versions().onReceived(primaryNodeId, dhtVer); + + if (valBytes != null && val == null && !cctx.config().isStoreValueBytes()) { + GridCacheVersion curDhtVer = dhtVersion(); + + if (!F.eq(dhtVer, curDhtVer)) + val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader()); + } + + synchronized (this) { + checkObsolete(); + + this.primaryNodeId = primaryNodeId; + + if (!F.eq(this.dhtVer, dhtVer)) { + value(val, valBytes); + + this.ver = ver; + this.dhtVer = dhtVer; + + return true; + } + } + + return false; + } + + /** + * This method should be called only when lock is owned on this entry. + * + * @param dhtVer DHT version. + * @param val Value associated with version. + * @param valBytes Value bytes. + * @param expireTime Expire time. + * @param ttl Time to live. + * @param primaryNodeId Primary node ID. + */ + public void updateOrEvict(GridCacheVersion dhtVer, @Nullable V val, @Nullable byte[] valBytes, long expireTime, + long ttl, UUID primaryNodeId) { + assert dhtVer != null; + + cctx.versions().onReceived(primaryNodeId, dhtVer); + + synchronized (this) { + if (!obsolete()) { + // Don't set DHT version to null until we get a match from DHT remote transaction. + if (F.eq(this.dhtVer, dhtVer)) + this.dhtVer = null; + + // If we are here, then we already tried to evict this entry. + // If cannot evict, then update. + if (this.dhtVer == null) { + if (!markObsolete(dhtVer)) { + value(val, valBytes); + + ttlAndExpireTimeExtras((int) ttl, expireTime); + + this.primaryNodeId = primaryNodeId; + } + } + } + } + } + + /** + * @return DHT version for this entry. + * @throws GridCacheEntryRemovedException If obsolete. + */ + @Nullable public synchronized GridCacheVersion dhtVersion() throws GridCacheEntryRemovedException { + checkObsolete(); + + return dhtVer; + } + + /** + * @return Tuple with version and value of this entry. + * @throws GridCacheEntryRemovedException If entry has been removed. + */ + @Nullable public synchronized GridTuple3<GridCacheVersion, V, byte[]> versionedValue() + throws GridCacheEntryRemovedException { + checkObsolete(); + + if (dhtVer == null) + return null; + else { + V val0 = null; + byte[] valBytes0 = null; + + GridCacheValueBytes valBytesTuple = valueBytes(); + + if (!valBytesTuple.isNull()) { + if (valBytesTuple.isPlain()) + val0 = (V)valBytesTuple.get(); + else + valBytes0 = valBytesTuple.get(); + } + else + val0 = val; + + return F.t(dhtVer, val0, valBytes0); + } + } + + /** + * @return ID of primary node from which this value was loaded. + */ + UUID nodeId() { + return primaryNodeId; + } + + /** {@inheritDoc} */ + @Override protected void recordNodeId(UUID primaryNodeId) { + assert Thread.holdsLock(this); + + this.primaryNodeId = primaryNodeId; + } + + /** + * This method should be called only when committing optimistic transactions. + * + * @param dhtVer DHT version to record. + */ + public synchronized void recordDhtVersion(GridCacheVersion dhtVer) { + // Version manager must be updated separately, when adding DHT version + // to transaction entries. + this.dhtVer = dhtVer; + } + + /** {@inheritDoc} */ + @Override protected V readThrough(IgniteTxEx<K, V> tx, K key, boolean reload, + IgnitePredicate<CacheEntry<K, V>>[] filter, UUID subjId, String taskName) throws IgniteCheckedException { + return cctx.near().loadAsync(tx, + F.asList(key), + reload, + /*force primary*/false, + filter, + subjId, + taskName, + true, + null).get().get(key); + } + + /** + * @param tx Transaction. + * @param primaryNodeId Primary node ID. + * @param val New value. + * @param valBytes Value bytes. + * @param ver Version to use. + * @param dhtVer DHT version received from remote node. + * @param expVer Optional version to match. + * @param ttl Time to live. + * @param expireTime Expiration time. + * @param evt Event flag. + * @param topVer Topology version. + * @param subjId Subject ID. + * @return {@code True} if initial value was set. + * @throws IgniteCheckedException In case of error. + * @throws GridCacheEntryRemovedException If entry was removed. + */ + @SuppressWarnings({"RedundantTypeArguments"}) + public boolean loadedValue(@Nullable IgniteTxEx tx, UUID primaryNodeId, V val, byte[] valBytes, + GridCacheVersion ver, GridCacheVersion dhtVer, @Nullable GridCacheVersion expVer, long ttl, long expireTime, + boolean evt, long topVer, UUID subjId) + throws IgniteCheckedException, GridCacheEntryRemovedException { + boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion()); + + if (valBytes != null && val == null && (isNewLocked() || !valid)) + val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader()); + + GridCacheVersion enqueueVer = null; + + try { + synchronized (this) { + checkObsolete(); + - cctx.cache().metrics0().onRead(false); ++ if (cctx.cache().configuration().isStatisticsEnabled()) ++ cctx.cache().metrics0().onRead(false); + + boolean ret = false; + + V old = this.val; + boolean hasVal = hasValueUnlocked(); + + if (isNew() || !valid || expVer == null || expVer.equals(this.dhtVer)) { + this.primaryNodeId = primaryNodeId; + + // Change entry only if dht version has changed. + if (!dhtVer.equals(dhtVersion())) { + update(val, valBytes, expireTime, ttl, ver); + + if (cctx.deferredDelete()) { + boolean deleted = val == null && valBytes == null; + + if (deleted != deletedUnlocked()) { + deletedUnlocked(deleted); + + if (deleted) + enqueueVer = ver; + } + } + + recordDhtVersion(dhtVer); + + ret = true; + } + } + + if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) + cctx.events().addEvent(partition(), key, tx, null, EVT_CACHE_OBJECT_READ, + val, val != null || valBytes != null, old, hasVal, subjId, null, null); + + return ret; + } + } + finally { + if (enqueueVer != null) + cctx.onDeferredDelete(this, enqueueVer); + } + } + + /** {@inheritDoc} */ + @Override protected void updateIndex(V val, byte[] valBytes, long expireTime, + GridCacheVersion ver, V old) throws IgniteCheckedException { + // No-op: queries are disabled for near cache. + } + + /** {@inheritDoc} */ + @Override protected void clearIndex(V val) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public GridCacheMvccCandidate<K> addLocal( + long threadId, + GridCacheVersion ver, + long timeout, + boolean reenter, + boolean tx, + boolean implicitSingle) throws GridCacheEntryRemovedException { + return addNearLocal( + null, + threadId, + ver, + timeout, + reenter, + tx, + implicitSingle + ); + } + + /** + * Add near local candidate. + * + * @param dhtNodeId DHT node ID. + * @param threadId Owning thread ID. + * @param ver Lock version. + * @param timeout Timeout to acquire lock. + * @param reenter Reentry flag. + * @param tx Transaction flag. + * @param implicitSingle Implicit flag. + * @return New candidate. + * @throws GridCacheEntryRemovedException If entry has been removed. + */ + @Nullable public GridCacheMvccCandidate<K> addNearLocal( + @Nullable UUID dhtNodeId, + long threadId, + GridCacheVersion ver, + long timeout, + boolean reenter, + boolean tx, + boolean implicitSingle) + throws GridCacheEntryRemovedException { + GridCacheMvccCandidate<K> prev; + GridCacheMvccCandidate<K> owner; + GridCacheMvccCandidate<K> cand; + + V val; + + UUID locId = cctx.nodeId(); + + synchronized (this) { + checkObsolete(); + + GridCacheMvcc<K> mvcc = mvccExtras(); + + if (mvcc == null) { + mvcc = new GridCacheMvcc<>(cctx); + + mvccExtras(mvcc); + } + + GridCacheMvccCandidate<K> c = mvcc.localCandidate(locId, threadId); + + if (c != null) + return reenter ? c.reenter() : null; + + prev = mvcc.anyOwner(); + + boolean emptyBefore = mvcc.isEmpty(); + + // Lock could not be acquired. + if (timeout < 0 && !emptyBefore) + return null; + + // Local lock for near cache is a local lock. + cand = mvcc.addNearLocal(this, locId, dhtNodeId, threadId, ver, timeout, tx, implicitSingle); + + owner = mvcc.anyOwner(); + + boolean emptyAfter = mvcc.isEmpty(); + + checkCallbacks(emptyBefore, emptyAfter); + + val = this.val; + + if (emptyAfter) + mvccExtras(null); + } + + // This call must be outside of synchronization. + checkOwnerChanged(prev, owner, val); + + return cand; + } + + /** + * @param ver Version to set DHT node ID for. + * @param dhtNodeId DHT node ID. + * @return {@code true} if candidate was found. + * @throws GridCacheEntryRemovedException If entry is removed. + */ + @Nullable public synchronized GridCacheMvccCandidate<K> dhtNodeId(GridCacheVersion ver, UUID dhtNodeId) + throws GridCacheEntryRemovedException { + checkObsolete(); + + GridCacheMvcc<K> mvcc = mvccExtras(); + + GridCacheMvccCandidate<K> cand = mvcc == null ? null : mvcc.candidate(ver); + + if (cand == null) + return null; + + cand.otherNodeId(dhtNodeId); + + return cand; + } + + /** + * Unlocks local lock. + * + * @return Removed candidate, or <tt>null</tt> if thread still holds the lock. + */ + @Nullable @Override public GridCacheMvccCandidate<K> removeLock() { + GridCacheMvccCandidate<K> prev = null; + GridCacheMvccCandidate<K> owner = null; + + V val; + + UUID locId = cctx.nodeId(); + + GridCacheMvccCandidate<K> cand = null; + + synchronized (this) { + GridCacheMvcc<K> mvcc = mvccExtras(); + + if (mvcc != null) { + prev = mvcc.anyOwner(); + + boolean emptyBefore = mvcc.isEmpty(); + + cand = mvcc.localCandidate(locId, Thread.currentThread().getId()); + + assert cand == null || cand.nearLocal(); + + if (cand != null && cand.owner()) { + // If a reentry, then release reentry. Otherwise, remove lock. + GridCacheMvccCandidate<K> reentry = cand.unenter(); + + if (reentry != null) { + assert reentry.reentry(); + + return reentry; + } + + mvcc.remove(cand.version()); + + owner = mvcc.anyOwner(); + } + else + return null; + + boolean emptyAfter = mvcc.isEmpty(); + + checkCallbacks(emptyBefore, emptyAfter); + + if (emptyAfter) + mvccExtras(null); + } + + val = this.val; + } + + assert cand != null; + assert owner != prev; + + if (log.isDebugEnabled()) + log.debug("Released local candidate from entry [owner=" + owner + ", prev=" + prev + + ", entry=" + this + ']'); + + cctx.mvcc().removeExplicitLock(cand); + + if (prev != null && owner != prev) + checkThreadChain(prev); + + // This call must be outside of synchronization. + checkOwnerChanged(prev, owner, val); + + return owner != prev ? prev : null; + } + + /** {@inheritDoc} */ + @Override protected void onInvalidate() { + dhtVer = null; + } + + /** {@inheritDoc} */ + @Override public CacheEntry<K, V> wrap(boolean prjAware) { + GridCacheProjectionImpl<K, V> prjPerCall = null; + + if (prjAware) + prjPerCall = cctx.projectionPerCall(); + + return new GridPartitionedCacheEntryImpl<>(prjPerCall, cctx, key, this); + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return S.toString(GridNearCacheEntry.class, this, "super", super.toString()); + } + }