http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 0000000,88e563d..574f6e9 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@@ -1,0 -1,4182 +1,4203 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.internal.util.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.managers.deployment.*; + import org.apache.ignite.internal.processors.cache.distributed.dht.*; + import org.apache.ignite.internal.processors.cache.extras.*; + import org.apache.ignite.internal.processors.cache.query.*; + import org.apache.ignite.internal.processors.cache.transactions.*; + import org.apache.ignite.internal.processors.dr.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.internal.util.offheap.unsafe.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + import sun.misc.*; + + import javax.cache.expiry.*; + import javax.cache.processor.*; + import java.io.*; + import java.nio.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static org.apache.ignite.cache.CacheFlag.*; + import static org.apache.ignite.transactions.IgniteTxState.*; + import static org.apache.ignite.events.IgniteEventType.*; + import static org.apache.ignite.internal.processors.dr.GridDrType.*; + + /** + * Adapter for cache entry. + */ + @SuppressWarnings({ + "NonPrivateFieldAccessedInSynchronizedContext", "TooBroadScope", "FieldAccessedSynchronizedAndUnsynchronized"}) + public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** */ + private static final byte IS_DELETED_MASK = 0x01; + + /** */ + private static final byte IS_UNSWAPPED_MASK = 0x02; + + /** */ + private static final Comparator<GridCacheVersion> ATOMIC_VER_COMPARATOR = new GridCacheAtomicVersionComparator(); + + /** + * NOTE + * ==== + * Make sure to recalculate this value any time when adding or removing fields from entry. + * The size should be count as follows: + * <ul> + * <li>Primitives: byte/boolean = 1, short = 2, int/float = 4, long/double = 8</li> + * <li>References: 8 each</li> + * <li>Each nested object should be analyzed in the same way as above.</li> + * </ul> + */ + private static final int SIZE_OVERHEAD = 87 /*entry*/ + 32 /* version */; + + /** Static logger to avoid re-creation. Made static for test purpose. */ + protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + protected static volatile IgniteLogger log; + + /** Cache registry. */ + @GridToStringExclude + protected final GridCacheContext<K, V> cctx; + + /** Key. */ + @GridToStringInclude + protected final K key; + + /** Value. */ + @GridToStringInclude + protected V val; + + /** Start version. */ + @GridToStringInclude + protected final long startVer; + + /** Version. */ + @GridToStringInclude + protected GridCacheVersion ver; + + /** Next entry in the linked list. */ + @GridToStringExclude + private volatile GridCacheMapEntry<K, V> next0; + + /** Next entry in the linked list. */ + @GridToStringExclude + private volatile GridCacheMapEntry<K, V> next1; + + /** Key hash code. */ + @GridToStringInclude + private final int hash; + + /** Key bytes. */ + @GridToStringExclude + private volatile byte[] keyBytes; + + /** Value bytes. */ + @GridToStringExclude + protected byte[] valBytes; + + /** Off-heap value pointer. */ + private long valPtr; + + /** Extras */ + @GridToStringInclude + private GridCacheEntryExtras<K> extras; + + /** + * Flags: + * <ul> + * <li>Deleted flag - mask {@link #IS_DELETED_MASK}</li> + * <li>Unswapped flag - mask {@link #IS_UNSWAPPED_MASK}</li> + * </ul> + */ + @GridToStringInclude + protected byte flags; + + /** + * @param cctx Cache context. + * @param key Cache key. + * @param hash Key hash value. + * @param val Entry value. + * @param next Next entry in the linked list. + * @param ttl Time to live. + * @param hdrId Header id. + */ + protected GridCacheMapEntry(GridCacheContext<K, V> cctx, K key, int hash, V val, + GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + log = U.logger(cctx.kernalContext(), logRef, GridCacheMapEntry.class); + + if (cctx.portableEnabled()) + key = (K)cctx.kernalContext().portable().detachPortable(key); + + this.key = key; + this.hash = hash; + this.cctx = cctx; + + ttlAndExpireTimeExtras(ttl, toExpireTime(ttl)); + + if (cctx.portableEnabled()) + val = (V)cctx.kernalContext().portable().detachPortable(val); + + synchronized (this) { + value(val, null); + } + + next(hdrId, next); + + ver = cctx.versions().next(); + + startVer = ver.order(); + } + + /** {@inheritDoc} */ + @Override public long startVersion() { + return startVer; + } + + /** + * Sets entry value. If off-heap value storage is enabled, will serialize value to off-heap. + * + * @param val Value to store. + * @param valBytes Value bytes to store. + */ + protected void value(@Nullable V val, @Nullable byte[] valBytes) { + assert Thread.holdsLock(this); + + // In case we deal with GGFS cache, count updated data + if (cctx.cache().isGgfsDataCache() && cctx.kernalContext().ggfsHelper().isGgfsBlockKey(key())) { + int newSize = valueLength((byte[])val, valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : + GridCacheValueBytes.nil()); + int oldSize = valueLength((byte[])this.val, this.val == null ? valueBytesUnlocked() : + GridCacheValueBytes.nil()); + + int delta = newSize - oldSize; + + if (delta != 0 && !cctx.isNear()) + cctx.cache().onGgfsDataSizeChanged(delta); + } + + if (!isOffHeapValuesOnly()) { + this.val = val; + this.valBytes = isStoreValueBytes() ? valBytes : null; + + valPtr = 0; + } + else { + try { + if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) { + if (val != null || valBytes != null) { + if (val == null) + val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader()); + + if (val != null) + cctx.gridDeploy().deploy(val.getClass(), val.getClass().getClassLoader()); + } + + if (U.p2pLoader(val)) { + cctx.deploy().addDeploymentContext( + new GridDeploymentInfoBean((GridDeploymentInfo)val.getClass().getClassLoader())); + } + } + + GridUnsafeMemory mem = cctx.unsafeMemory(); + + assert mem != null; + + if (val != null || valBytes != null) { + boolean valIsByteArr = val instanceof byte[]; + + if (valBytes == null && !valIsByteArr) + valBytes = CU.marshal(cctx.shared(), val); + + valPtr = mem.putOffHeap(valPtr, valIsByteArr ? (byte[])val : valBytes, valIsByteArr); + } + else { + mem.removeOffHeap(valPtr); + + valPtr = 0; + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to deserialize value [entry=" + this + ", val=" + val + ']'); + + throw new IgniteException(e); + } + } + } + + /** + * Isolated method to get length of GGFS block. + * + * @param val Value. + * @param valBytes Value bytes. + * @return Length of value. + */ + private int valueLength(@Nullable byte[] val, GridCacheValueBytes valBytes) { + assert valBytes != null; + + return val != null ? val.length : valBytes.isNull() ? 0 : valBytes.get().length - (valBytes.isPlain() ? 0 : 6); + } + + /** + * @return Value bytes. + */ + protected GridCacheValueBytes valueBytesUnlocked() { + assert Thread.holdsLock(this); + + if (!isOffHeapValuesOnly()) { + if (valBytes != null) + return GridCacheValueBytes.marshaled(valBytes); + + try { + if (valPtr != 0 && cctx.offheapTiered()) + return offheapValueBytes(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + else { + if (valPtr != 0) { + GridUnsafeMemory mem = cctx.unsafeMemory(); + + assert mem != null; + + return mem.getOffHeap(valPtr); + } + } + + return GridCacheValueBytes.nil(); + } + + /** {@inheritDoc} */ + @Override public int memorySize() throws IgniteCheckedException { + byte[] kb; + GridCacheValueBytes vb; + + V v; + + int extrasSize; + + synchronized (this) { + kb = keyBytes; + vb = valueBytesUnlocked(); + + v = val; + + extrasSize = extrasSize(); + } + + if (kb == null || (vb.isNull() && v != null)) { + if (kb == null) + kb = CU.marshal(cctx.shared(), key); + + if (vb.isNull()) + vb = (v != null && v instanceof byte[]) ? GridCacheValueBytes.plain(v) : + GridCacheValueBytes.marshaled(CU.marshal(cctx.shared(), v)); + + synchronized (this) { + if (keyBytes == null) + keyBytes = kb; + + // If value didn't change. + if (!isOffHeapValuesOnly() && valBytes == null && val == v && cctx.config().isStoreValueBytes()) + valBytes = vb.isPlain() ? null : vb.get(); + } + } + + return SIZE_OVERHEAD + extrasSize + kb.length + (vb.isNull() ? 0 : vb.get().length); + } + + /** {@inheritDoc} */ + @Override public boolean isInternal() { + return key instanceof GridCacheInternal; + } + + /** {@inheritDoc} */ + @Override public boolean isDht() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isLocal() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isNear() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isReplicated() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean detached() { + return false; + } + + /** {@inheritDoc} */ + @Override public GridCacheContext<K, V> context() { + return cctx; + } + + /** {@inheritDoc} */ + @Override public boolean isNew() throws GridCacheEntryRemovedException { + assert Thread.holdsLock(this); + + checkObsolete(); + + return isStartVersion(); + } + + /** {@inheritDoc} */ + @Override public synchronized boolean isNewLocked() throws GridCacheEntryRemovedException { + checkObsolete(); + + return isStartVersion(); + } + + /** + * @return {@code True} if start version. + */ + public boolean isStartVersion() { + return ver.nodeOrder() == cctx.localNode().order() && ver.order() == startVer; + } + + /** {@inheritDoc} */ + @Override public boolean valid(long topVer) { + return true; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return 0; + } + + /** {@inheritDoc} */ + @Override public boolean partitionValid() { + return true; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCacheEntryInfo<K, V> info() { + GridCacheEntryInfo<K, V> info = null; + + long time = U.currentTimeMillis(); + + try { + synchronized (this) { + if (!obsolete()) { + info = new GridCacheEntryInfo<>(); + + info.key(key); + + long expireTime = expireTimeExtras(); + + boolean expired = expireTime != 0 && expireTime <= time; + + info.keyBytes(keyBytes); + info.ttl(ttlExtras()); + info.expireTime(expireTime); + info.version(ver); + info.setNew(isStartVersion()); + info.setDeleted(deletedUnlocked()); + + if (!expired) { + info.value(cctx.kernalContext().config().isPeerClassLoadingEnabled() ? + rawGetOrUnmarshalUnlocked(false) : val); + + GridCacheValueBytes valBytes = valueBytesUnlocked(); + + if (!valBytes.isNull()) { + if (valBytes.isPlain()) + info.value((V)valBytes.get()); + else + info.valueBytes(valBytes.get()); + } + } + } + } + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal object while creating entry info: " + this, e); + } + + return info; + } + + /** {@inheritDoc} */ + @Override public V unswap() throws IgniteCheckedException { + return unswap(false, true); + } + + /** + * Unswaps an entry. + * + * @param ignoreFlags Whether to ignore swap flags. + * @param needVal If {@code false} then do not to deserialize value during unswap. + * @return Value. + * @throws IgniteCheckedException If failed. + */ + @Nullable @Override public V unswap(boolean ignoreFlags, boolean needVal) throws IgniteCheckedException { + boolean swapEnabled = cctx.swap().swapEnabled() && (ignoreFlags || !cctx.hasFlag(SKIP_SWAP)); + + if (!swapEnabled && !cctx.isOffHeapEnabled()) + return null; + + synchronized (this) { + if (isStartVersion() && ((flags & IS_UNSWAPPED_MASK) == 0)) { + GridCacheSwapEntry<V> e; + + if (cctx.offheapTiered()) { + e = cctx.swap().readOffheapPointer(this); + + if (e != null) { + if (e.offheapPointer() > 0) { + valPtr = e.offheapPointer(); + + if (needVal) { + V val = unmarshalOffheap(false); + + e.value(val); + } + } + else { // Read from swap. + valPtr = 0; + + if (cctx.portableEnabled() && !e.valueIsByteArray()) + e.valueBytes(null); // Clear bytes marshalled with portable marshaller. + } + } + } + else + e = detached() ? cctx.swap().read(this, true) : cctx.swap().readAndRemove(this); + + if (log.isDebugEnabled()) + log.debug("Read swap entry [swapEntry=" + e + ", cacheEntry=" + this + ']'); + + flags |= IS_UNSWAPPED_MASK; + + // If there is a value. + if (e != null) { + long delta = e.expireTime() == 0 ? 0 : e.expireTime() - U.currentTimeMillis(); + + if (delta >= 0) { + V val = e.value(); + + if (cctx.portableEnabled()) + val = (V)cctx.kernalContext().portable().detachPortable(val); + + // Set unswapped value. + update(val, e.valueBytes(), e.expireTime(), e.ttl(), e.version()); + + // Must update valPtr again since update() will reset it. + if (cctx.offheapTiered() && e.offheapPointer() > 0) + valPtr = e.offheapPointer(); + + return val; + } + else + clearIndex(e.value()); + } + } + } + + return null; + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void swap() throws IgniteCheckedException { + if (cctx.isSwapOrOffheapEnabled() && !deletedUnlocked() && hasValueUnlocked() && !detached()) { + assert Thread.holdsLock(this); + + long expireTime = expireTimeExtras(); + + if (expireTime > 0 && U.currentTimeMillis() >= expireTime) { // Don't swap entry if it's expired. + // Entry might have been updated. + if (cctx.offheapTiered()) { + cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); + + valPtr = 0; + } + + return; + } + + if (val == null && cctx.offheapTiered() && valPtr != 0) { + if (log.isDebugEnabled()) + log.debug("Value did not change, skip write swap entry: " + this); + + if (cctx.swap().offheapEvictionEnabled()) + cctx.swap().enableOffheapEviction(key(), getOrMarshalKeyBytes()); + + return; + } + + boolean plain = val instanceof byte[]; + + IgniteUuid valClsLdrId = null; + + if (val != null) + valClsLdrId = cctx.deploy().getClassLoaderId(val.getClass().getClassLoader()); + + cctx.swap().write(key(), + getOrMarshalKeyBytes(), + plain ? ByteBuffer.wrap((byte[])val) : swapValueBytes(), + plain, + ver, + ttlExtras(), + expireTime, + cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(key)), + valClsLdrId); + + if (log.isDebugEnabled()) + log.debug("Wrote swap entry: " + this); + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + protected final void releaseSwap() throws IgniteCheckedException { + if (cctx.isSwapOrOffheapEnabled()) { + synchronized (this){ + cctx.swap().remove(key(), getOrMarshalKeyBytes()); + } + + if (log.isDebugEnabled()) + log.debug("Removed swap entry [entry=" + this + ']'); + } + } + + /** + * @param tx Transaction. + * @param key Key. + * @param reload flag. + * @param filter Filter. + * @param subjId Subject ID. + * @param taskName Task name. + * @return Read value. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings({"RedundantTypeArguments"}) + @Nullable protected V readThrough(@Nullable IgniteTxEx<K, V> tx, K key, boolean reload, + IgnitePredicate<CacheEntry<K, V>>[] filter, UUID subjId, String taskName) throws IgniteCheckedException { + return cctx.store().loadFromStore(tx, key); + } + + /** {@inheritDoc} */ + @Nullable @Override public final V innerGet(@Nullable IgniteTxEx<K, V> tx, + boolean readSwap, + boolean readThrough, + boolean failFast, + boolean unmarshal, + boolean updateMetrics, + boolean evt, + boolean tmp, + UUID subjId, + Object transformClo, + String taskName, + IgnitePredicate<CacheEntry<K, V>>[] filter, + @Nullable IgniteCacheExpiryPolicy expirePlc) + throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException { + cctx.denyOnFlag(LOCAL); + + return innerGet0(tx, - readSwap, - readThrough, - evt, - failFast, - unmarshal, - updateMetrics, - tmp, - subjId, - transformClo, - taskName, - filter, - expirePlc); ++ readSwap, ++ readThrough, ++ evt, ++ failFast, ++ unmarshal, ++ updateMetrics, ++ tmp, ++ subjId, ++ transformClo, ++ taskName, ++ filter, ++ expirePlc); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "RedundantTypeArguments", "TooBroadScope"}) + private V innerGet0(IgniteTxEx<K, V> tx, + boolean readSwap, + boolean readThrough, + boolean evt, + boolean failFast, + boolean unmarshal, + boolean updateMetrics, + boolean tmp, + UUID subjId, + Object transformClo, + String taskName, + IgnitePredicate<CacheEntry<K, V>>[] filter, + @Nullable IgniteCacheExpiryPolicy expiryPlc) + throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException { + // Disable read-through if there is no store. + if (readThrough && !cctx.readThrough()) + readThrough = false; + + GridCacheMvccCandidate<K> owner; + + V old; + V ret = null; + + if (!F.isEmptyOrNulls(filter) && !cctx.isAll( + (new GridCacheFilterEvaluationEntry<>(key, rawGetOrUnmarshal(true), this, true)), filter)) + return CU.<V>failed(failFast); + + GridCacheVersion startVer; + + boolean expired = false; + + V expiredVal = null; + + boolean hasOldBytes; + + synchronized (this) { + checkObsolete(); + + // Cache version for optimistic check. + startVer = ver; + + GridCacheMvcc<K> mvcc = mvccExtras(); + + owner = mvcc == null ? null : mvcc.anyOwner(); + + double delta; + + long expireTime = expireTimeExtras(); + + if (expireTime > 0) { + delta = expireTime - U.currentTimeMillis(); + + if (log.isDebugEnabled()) + log.debug("Checked expiration time for entry [timeLeft=" + delta + ", entry=" + this + ']'); + + if (delta <= 0) + expired = true; + } + + V val = this.val; + + hasOldBytes = valBytes != null || valPtr != 0; + + if ((unmarshal || isOffHeapValuesOnly()) && !expired && val == null && hasOldBytes) + val = rawGetOrUnmarshalUnlocked(tmp); + + boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion()); + + // Attempt to load from swap. + if (val == null && !hasOldBytes && readSwap) { + // Only promote when loading initial state. + if (isNew() || !valid) { + // If this entry is already expired (expiration time was too low), + // we simply remove from swap and clear index. + if (expired) { + releaseSwap(); + + // Previous value is guaranteed to be null + clearIndex(null); + } + else { + // Read and remove swap entry. + if (tmp) { + unswap(false, false); + + val = rawGetOrUnmarshalUnlocked(true); + } + else + val = unswap(); + + // Recalculate expiration after swap read. + if (expireTime > 0) { + delta = expireTime - U.currentTimeMillis(); + + if (log.isDebugEnabled()) + log.debug("Checked expiration time for entry [timeLeft=" + delta + + ", entry=" + this + ']'); + + if (delta <= 0) + expired = true; + } + } + } + } + + old = expired || !valid ? null : val; + + if (expired) { + expiredVal = val; + + value(null, null); + } + + if (old == null && !hasOldBytes) { - if (updateMetrics) ++ if (updateMetrics && cctx.cache().configuration().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(false); + } + else { - if (updateMetrics) ++ if (updateMetrics && cctx.cache().configuration().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(true); + + // Set retVal here for event notification. + ret = old; + } + + if (evt && expired) { + if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) { + cctx.events().addEvent(partition(), + key, + tx, + owner, + EVT_CACHE_OBJECT_EXPIRED, + null, + false, + expiredVal, + expiredVal != null || hasOldBytes, + subjId, + null, + taskName); + } + + cctx.continuousQueries().onEntryExpired(this, key, expiredVal, null); + + // No more notifications. + evt = false; + } + + if (evt && !expired && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + cctx.events().addEvent(partition(), key, tx, owner, EVT_CACHE_OBJECT_READ, ret, ret != null, old, + hasOldBytes || old != null, subjId, + transformClo != null ? transformClo.getClass().getName() : null, taskName); + + // No more notifications. + evt = false; + } + + if (ret != null && expiryPlc != null) { + long ttl = expiryPlc.forAccess(); + + updateTtl(ttl); + + expiryPlc.ttlUpdated(key(), + getOrMarshalKeyBytes(), + version(), + hasReaders() ? ((GridDhtCacheEntry) this).readers() : null); + } + } + + // Check before load. + if (!cctx.isAll(this, filter)) + return CU.<V>failed(failFast, ret); + + if (ret != null) { + // If return value is consistent, then done. + if (F.isEmptyOrNulls(filter) || version().equals(startVer)) + return ret; + + // Try again (recursion). + return innerGet0(tx, + readSwap, + readThrough, + false, + failFast, + unmarshal, + updateMetrics, + tmp, + subjId, + transformClo, + taskName, + filter, + expiryPlc); + } + + boolean loadedFromStore = false; + + if (ret == null && readThrough) { + IgniteTxEx tx0 = null; + + if (tx != null && tx.local()) { + if (cctx.isReplicated() || cctx.isColocated() || tx.near()) + tx0 = tx; + else if (tx.dht()) { + GridCacheVersion ver = ((GridDhtTxLocalAdapter)tx).nearXidVersion(); + + tx0 = cctx.dht().near().context().tm().tx(ver); + } + } + + ret = readThrough(tx0, key, false, filter, subjId, taskName); + + loadedFromStore = true; + } + + boolean match = false; + + synchronized (this) { + long ttl = ttlExtras(); + + // If version matched, set value. + if (startVer.equals(ver)) { + match = true; + + if (ret != null) { + // Detach value before index update. + if (cctx.portableEnabled()) + ret = (V)cctx.kernalContext().portable().detachPortable(ret); + + GridCacheVersion nextVer = nextVersion(); + + V prevVal = rawGetOrUnmarshalUnlocked(false); + + long expTime = toExpireTime(ttl); + + if (loadedFromStore) + // Update indexes before actual write to entry. + updateIndex(ret, null, expTime, nextVer, prevVal); + + boolean hadValPtr = valPtr != 0; + + // Don't change version for read-through. + update(ret, null, expTime, ttl, nextVer); + + if (hadValPtr && cctx.offheapTiered()) + cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); + + if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) + deletedUnlocked(false); + } + + if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) + cctx.events().addEvent(partition(), key, tx, owner, EVT_CACHE_OBJECT_READ, ret, ret != null, + old, hasOldBytes, subjId, transformClo != null ? transformClo.getClass().getName() : null, + taskName); + } + } + + if (F.isEmptyOrNulls(filter) || match) + return ret; + + // Try again (recursion). + return innerGet0(tx, + readSwap, + readThrough, + false, + failFast, + unmarshal, + updateMetrics, + tmp, + subjId, + transformClo, + taskName, + filter, + expiryPlc); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "TooBroadScope"}) + @Nullable @Override public final V innerReload(IgnitePredicate<CacheEntry<K, V>>[] filter) + throws IgniteCheckedException, GridCacheEntryRemovedException { + cctx.denyOnFlag(READ); + + CU.checkStore(cctx); + + GridCacheVersion startVer; + + boolean wasNew; + + synchronized (this) { + checkObsolete(); + + // Cache version for optimistic check. + startVer = ver; + + wasNew = isNew(); + } + + String taskName = cctx.kernalContext().job().currentTaskName(); + + // Check before load. + if (cctx.isAll(this, filter)) { + V ret = readThrough(null, key, true, filter, cctx.localNodeId(), taskName); + + boolean touch = false; + + try { + synchronized (this) { + long ttl = ttlExtras(); + + // Generate new version. + GridCacheVersion nextVer = cctx.versions().nextForLoad(ver); + + // If entry was loaded during read step. + if (wasNew && !isNew()) + // Map size was updated on entry creation. + return ret; + + // If version matched, set value. + if (startVer.equals(ver)) { + releaseSwap(); + + V old = rawGetOrUnmarshalUnlocked(false); + + long expTime = toExpireTime(ttl); + + // Detach value before index update. + if (cctx.portableEnabled()) + ret = (V)cctx.kernalContext().portable().detachPortable(ret); + + // Update indexes. + if (ret != null) { + updateIndex(ret, null, expTime, nextVer, old); + + if (cctx.deferredDelete() && !isInternal() && !detached() && deletedUnlocked()) + deletedUnlocked(false); + } + else { + clearIndex(old); + + if (cctx.deferredDelete() && !isInternal() && !detached() && !deletedUnlocked()) + deletedUnlocked(true); + } + + update(ret, null, expTime, ttl, nextVer); + + touch = true; + + // If value was set - return, otherwise try again. + return ret; + } + } + + if (F.isEmptyOrNulls(filter)) { + touch = true; + + return ret; + } + } + finally { + if (touch) + cctx.evicts().touch(this, cctx.affinity().affinityTopologyVersion()); + } + + // Recursion. + return innerReload(filter); + } + + // If filter didn't pass. + return null; + } + + /** + * @param nodeId Node ID. + */ + protected void recordNodeId(UUID nodeId) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public final GridCacheUpdateTxResult<V> innerSet( + @Nullable IgniteTxEx<K, V> tx, + UUID evtNodeId, + UUID affNodeId, + V val, + @Nullable byte[] valBytes, + boolean writeThrough, + boolean retval, + long ttl, + boolean evt, + boolean metrics, + long topVer, + IgnitePredicate<CacheEntry<K, V>>[] filter, + GridDrType drType, + long drExpireTime, + @Nullable GridCacheVersion explicitVer, + @Nullable UUID subjId, + String taskName + ) throws IgniteCheckedException, GridCacheEntryRemovedException { + V old; + + boolean valid = valid(tx != null ? tx.topologyVersion() : topVer); + + // Lock should be held by now. + if (!cctx.isAll(this, filter)) + return new GridCacheUpdateTxResult<>(false, null); + + final GridCacheVersion newVer; + + boolean intercept = cctx.config().getInterceptor() != null; + + synchronized (this) { + checkObsolete(); + + if (cctx.kernalContext().config().isCacheSanityCheckEnabled()) { + if (tx != null && tx.groupLock()) + groupLockSanityCheck(tx); + else + assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : + "Transaction does not own lock for update [entry=" + this + ", tx=" + tx + ']'; + } + + // Load and remove from swap if it is new. + boolean startVer = isStartVersion(); + + if (startVer) + unswap(true, retval); + + newVer = explicitVer != null ? explicitVer : tx == null ? + nextVersion() : tx.writeVersion(); + + assert newVer != null : "Failed to get write version for tx: " + tx; + + if (tx != null && !tx.local() && tx.onePhaseCommit() && explicitVer == null) { + if (!(isNew() || !valid) && ver.compareTo(newVer) > 0) { + if (log.isDebugEnabled()) + log.debug("Skipping entry update for one-phase commit since current entry version is " + + "greater than write version [entry=" + this + ", newVer=" + newVer + ']'); + + return new GridCacheUpdateTxResult<>(false, null); + } + } + + old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : this.val; + + GridCacheValueBytes oldBytes = valueBytesUnlocked(); + + if (intercept) { + V interceptorVal = (V)cctx.config().getInterceptor().onBeforePut(key, old, val); + + if (interceptorVal == null) + return new GridCacheUpdateTxResult<>(false, cctx.<V>unwrapTemporary(old)); + else if (interceptorVal != val) { + val = cctx.unwrapTemporary(interceptorVal); + + valBytes = null; + } + } + + // Determine new ttl and expire time. + long expireTime; + + if (drExpireTime >= 0) { + assert ttl >= 0 : ttl; + + expireTime = drExpireTime; + } + else { + if (ttl == -1L) { + ttl = ttlExtras(); + expireTime = expireTimeExtras(); + } + else + expireTime = toExpireTime(ttl); + } + + assert ttl >= 0; + assert expireTime >= 0; + + // Detach value before index update. + if (cctx.portableEnabled()) + val = (V)cctx.kernalContext().portable().detachPortable(val); + + // Update index inside synchronization since it can be updated + // in load methods without actually holding entry lock. + if (val != null || valBytes != null) { + updateIndex(val, valBytes, expireTime, newVer, old); + + if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) + deletedUnlocked(false); + } + + update(val, valBytes, expireTime, ttl, newVer); + + drReplicate(drType, val, valBytes, newVer); + + recordNodeId(affNodeId); + - if (metrics) ++ if (metrics && cctx.cache().configuration().isStatisticsEnabled()) + cctx.cache().metrics0().onWrite(); + + if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { + V evtOld = cctx.unwrapTemporary(old); + + cctx.events().addEvent(partition(), key, evtNodeId, tx == null ? null : tx.xid(), + newVer, EVT_CACHE_OBJECT_PUT, val, val != null, evtOld, evtOld != null || hasValueUnlocked(), + subjId, null, taskName); + } + + CacheMode mode = cctx.config().getCacheMode(); + + if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED || + (tx != null && tx.local() && !isNear())) + cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); + + cctx.dataStructures().onEntryUpdated(key, false); + } + + if (log.isDebugEnabled()) + log.debug("Updated cache entry [val=" + val + ", old=" + old + ", entry=" + this + ']'); + + // Persist outside of synchronization. The correctness of the + // value will be handled by current transaction. + if (writeThrough) + cctx.store().putToStore(tx, key, val, newVer); + + if (intercept) + cctx.config().getInterceptor().onAfterPut(key, val); + + return valid ? new GridCacheUpdateTxResult<>(true, retval ? old : null) : + new GridCacheUpdateTxResult<V>(false, null); + } + + /** {@inheritDoc} */ + @Override public final GridCacheUpdateTxResult<V> innerRemove( + @Nullable IgniteTxEx<K, V> tx, + UUID evtNodeId, + UUID affNodeId, + boolean writeThrough, + boolean retval, + boolean evt, + boolean metrics, + long topVer, + IgnitePredicate<CacheEntry<K, V>>[] filter, + GridDrType drType, + @Nullable GridCacheVersion explicitVer, + @Nullable UUID subjId, + String taskName + ) throws IgniteCheckedException, GridCacheEntryRemovedException { + assert cctx.transactional(); + + V old; + + GridCacheVersion newVer; + + boolean valid = valid(tx != null ? tx.topologyVersion() : topVer); + + // Lock should be held by now. + if (!cctx.isAll(this, filter)) + return new GridCacheUpdateTxResult<>(false, null); + + GridCacheVersion obsoleteVer = null; + + GridCacheVersion enqueueVer = null; + + boolean intercept = cctx.config().getInterceptor() != null; + + IgniteBiTuple<Boolean, V> interceptRes = null; + + try { + synchronized (this) { + checkObsolete(); + + if (tx != null && tx.groupLock() && cctx.kernalContext().config().isCacheSanityCheckEnabled()) + groupLockSanityCheck(tx); + else + assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : + "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']'; + + boolean startVer = isStartVersion(); + + if (startVer) { + if (tx != null && !tx.local() && tx.onePhaseCommit()) + // Must promote to check version for one-phase commit tx. + unswap(true, retval); + else + // Release swap. + releaseSwap(); + } + + newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion(); + + if (tx != null && !tx.local() && tx.onePhaseCommit() && explicitVer == null) { + if (!startVer && ver.compareTo(newVer) > 0) { + if (log.isDebugEnabled()) + log.debug("Skipping entry removal for one-phase commit since current entry version is " + + "greater than write version [entry=" + this + ", newVer=" + newVer + ']'); + + return new GridCacheUpdateTxResult<>(false, null); + } + + if (!detached()) + enqueueVer = newVer; + } + + old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val; + + if (intercept) { + interceptRes = cctx.config().<K, V>getInterceptor().onBeforeRemove(key, old); + + if (cctx.cancelRemove(interceptRes)) + return new GridCacheUpdateTxResult<>(false, cctx.<V>unwrapTemporary(interceptRes.get2())); + } + + GridCacheValueBytes oldBytes = valueBytesUnlocked(); + + if (old == null) + old = saveValueForIndexUnlocked(); + + // Clear indexes inside of synchronization since indexes + // can be updated without actually holding entry lock. + clearIndex(old); + + boolean hadValPtr = valPtr != 0; + + update(null, null, 0, 0, newVer); + + if (cctx.offheapTiered() && hadValPtr) { + boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); + + assert rmv; + } + + if (cctx.deferredDelete() && !detached() && !isInternal()) { + if (!deletedUnlocked()) { + deletedUnlocked(true); + + if (tx != null) { + GridCacheMvcc<K> mvcc = mvccExtras(); + + if (mvcc == null || mvcc.isEmpty(tx.xidVersion())) + clearReaders(); + else + clearReader(tx.originatingNodeId()); + } + } + } + + drReplicate(drType, null, null, newVer); + - if (metrics) - cctx.cache().metrics0().onWrite(); ++ if (metrics && cctx.cache().configuration().isStatisticsEnabled()) ++ cctx.cache().metrics0().onRemove(); + + if (tx == null) + obsoleteVer = newVer; + else { + // Only delete entry if the lock is not explicit. + if (tx.groupLock() || lockedBy(tx.xidVersion())) + obsoleteVer = tx.xidVersion(); + else if (log.isDebugEnabled()) + log.debug("Obsolete version was not set because lock was explicit: " + this); + } + + if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { + V evtOld = cctx.unwrapTemporary(old); + + cctx.events().addEvent(partition(), key, evtNodeId, tx == null ? null : tx.xid(), newVer, + EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hasValueUnlocked(), subjId, + null, taskName); + } + + CacheMode mode = cctx.config().getCacheMode(); + + if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED || + (tx != null && tx.local() && !isNear())) + cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes, false); + + cctx.dataStructures().onEntryUpdated(key, true); + } + } + finally { + if (enqueueVer != null) { + assert cctx.deferredDelete(); + + cctx.onDeferredDelete(this, enqueueVer); + } + } + + // Persist outside of synchronization. The correctness of the + // value will be handled by current transaction. + if (writeThrough) + cctx.store().removeFromStore(tx, key); + + if (!cctx.deferredDelete()) { + boolean marked = false; + + synchronized (this) { + // If entry is still removed. + if (newVer == ver) { + if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true))) { + if (log.isDebugEnabled()) + log.debug("Entry could not be marked obsolete (it is still used): " + this); + } + else { + recordNodeId(affNodeId); + + // If entry was not marked obsolete, then removed lock + // will be registered whenever removeLock is called. + cctx.mvcc().addRemoved(cctx, obsoleteVer); + + if (log.isDebugEnabled()) + log.debug("Entry was marked obsolete: " + this); + } + } + } + + if (marked) + onMarkedObsolete(); + } + + if (intercept) + cctx.config().getInterceptor().onAfterRemove(key, old); + + return valid ? + new GridCacheUpdateTxResult<>(true, + cctx.<V>unwrapTemporary(interceptRes != null ? interceptRes.get2() : old)) : + new GridCacheUpdateTxResult<V>(false, null); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public GridTuple3<Boolean, V, EntryProcessorResult<Object>> innerUpdateLocal( + GridCacheVersion ver, + GridCacheOperation op, + @Nullable Object writeObj, + @Nullable Object[] invokeArgs, + boolean writeThrough, + boolean retval, + @Nullable ExpiryPolicy expiryPlc, + boolean evt, + boolean metrics, + @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter, + boolean intercept, + @Nullable UUID subjId, + String taskName + ) throws IgniteCheckedException, GridCacheEntryRemovedException { + assert cctx.isLocal() && cctx.atomic(); + + V old; + + boolean res = true; + + IgniteBiTuple<Boolean, ?> interceptorRes = null; + + EntryProcessorResult<Object> invokeRes = null; + + synchronized (this) { + boolean needVal = retval || intercept || op == GridCacheOperation.TRANSFORM || !F.isEmpty(filter); + + checkObsolete(); + + // Load and remove from swap if it is new. + if (isNew()) + unswap(true, retval); + + // Possibly get old value form store. + old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; + + GridCacheValueBytes oldBytes = valueBytesUnlocked(); + + if (needVal && old == null && (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { + old = readThrough(null, key, false, CU.<K, V>empty(), subjId, taskName); + + // Detach value before index update. + if (cctx.portableEnabled()) + old = (V)cctx.kernalContext().portable().detachPortable(old); + + if (old != null) + updateIndex(old, null, expireTime(), ver, null); + else + clearIndex(null); + + update(old, null, 0, 0, ver); + } + ++ // Apply metrics. ++ if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) { ++ // PutIfAbsent methods mustn't update hit/miss statistics ++ if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noPeekArray()) ++ cctx.cache().metrics0().onRead(old != null); ++ } ++ + // Check filter inside of synchronization. + if (!F.isEmpty(filter)) { + boolean pass = cctx.isAll(wrapFilterLocked(), filter); + + if (!pass) { + if (expiryPlc != null && hasValueUnlocked()) { + long ttl = CU.toTtl(expiryPlc.getExpiryForAccess()); + + if (ttl != -1L) + updateTtl(ttl); + } + + return new T3<>(false, retval ? old : null, null); + } + } + - // Apply metrics. - if (metrics && needVal) - cctx.cache().metrics0().onRead(old != null); - + String transformCloClsName = null; + + V updated; + + // Calculate new value. + if (op == GridCacheOperation.TRANSFORM) { + transformCloClsName = writeObj.getClass().getName(); + + EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)writeObj; + + assert entryProcessor != null; + + CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(key, old); + + try { + Object computed = entryProcessor.process(entry, invokeArgs); + + updated = cctx.unwrapTemporary(entry.getValue()); + + invokeRes = computed != null ? new CacheInvokeResult<>(cctx.unwrapTemporary(computed)) : null; + } + catch (Exception e) { + updated = old; + + invokeRes = new CacheInvokeResult<>(e); + } + + if (!entry.modified()) + return new GridTuple3<>(false, null, invokeRes); + } + else + updated = (V)writeObj; + + op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE; + + if (intercept) { + if (op == GridCacheOperation.UPDATE) { + updated = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated); + + if (updated == null) + return new GridTuple3<>(false, cctx.<V>unwrapTemporary(old), invokeRes); + } + else { + interceptorRes = cctx.config().getInterceptor().onBeforeRemove(key, old); + + if (cctx.cancelRemove(interceptorRes)) + return new GridTuple3<>(false, cctx.<V>unwrapTemporary(interceptorRes.get2()), invokeRes); + } + } + + boolean hadVal = hasValueUnlocked(); + + // Try write-through. + if (op == GridCacheOperation.UPDATE) { + // Detach value before index update. + if (cctx.portableEnabled()) + updated = (V)cctx.kernalContext().portable().detachPortable(updated); + + if (writeThrough) + // Must persist inside synchronization in non-tx mode. + cctx.store().putToStore(null, key, updated, ver); + + long ttl; + long expireTime; + + if (expiryPlc != null) { + ttl = CU.toTtl(hadVal ? expiryPlc.getExpiryForUpdate() : expiryPlc.getExpiryForCreation()); + + if (ttl == -1L) { + ttl = ttlExtras(); + + expireTime = expireTimeExtras(); + } + else + expireTime = toExpireTime(ttl); + } + else { + ttl = ttlExtras(); + + expireTime = expireTimeExtras(); + } + + // Update index inside synchronization since it can be updated + // in load methods without actually holding entry lock. + updateIndex(updated, null, expireTime, ver, old); + + update(updated, null, expireTime, ttl, ver); + + if (evt) { + V evtOld = null; + + if (transformCloClsName != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + evtOld = cctx.unwrapTemporary(old); + + cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, + (GridCacheVersion)null, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, + evtOld != null || hadVal, subjId, transformCloClsName, taskName); + } + + if (cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { + if (evtOld == null) + evtOld = cctx.unwrapTemporary(old); + + cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, + (GridCacheVersion)null, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld, + evtOld != null || hadVal, subjId, null, taskName); + } + } + } + else { + if (writeThrough) + // Must persist inside synchronization in non-tx mode. + cctx.store().removeFromStore(null, key); + + // Update index inside synchronization since it can be updated + // in load methods without actually holding entry lock. + clearIndex(old); + + update(null, null, 0, 0, ver); + + if (evt) { + V evtOld = null; + + if (transformCloClsName != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) + cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, + (GridCacheVersion)null, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, + evtOld != null || hadVal, subjId, transformCloClsName, taskName); + + if (cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { + if (evtOld == null) + evtOld = cctx.unwrapTemporary(old); + + cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, (GridCacheVersion) null, + EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal, subjId, null, + taskName); + } + } + + res = hadVal; + } + - if (metrics) - cctx.cache().metrics0().onWrite(); ++ if (res) ++ updateMetrics(op, metrics); + + cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); + + cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); + + if (intercept) { + if (op == GridCacheOperation.UPDATE) + cctx.config().getInterceptor().onAfterPut(key, val); + else + cctx.config().getInterceptor().onAfterRemove(key, old); + } + } + + return new GridTuple3<>(res, cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old), invokeRes); + } + + /** {@inheritDoc} */ + @Override public GridCacheUpdateAtomicResult<K, V> innerUpdate( + GridCacheVersion newVer, + UUID evtNodeId, + UUID affNodeId, + GridCacheOperation op, + @Nullable Object writeObj, + @Nullable byte[] valBytes, + @Nullable Object[] invokeArgs, + boolean writeThrough, + boolean retval, + @Nullable IgniteCacheExpiryPolicy expiryPlc, + boolean evt, + boolean metrics, + boolean primary, + boolean verCheck, + @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter, + GridDrType drType, + long drTtl, + long drExpireTime, + @Nullable GridCacheVersion drVer, + boolean drResolve, + boolean intercept, + @Nullable UUID subjId, + String taskName + ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { + assert cctx.atomic(); + + V old; + + boolean res = true; + + V updated; + + GridCacheVersion enqueueVer = null; + + GridDrResolveResult<V> drRes = null; + + EntryProcessorResult<?> invokeRes = null; + + long newTtl = -1L; + long newExpireTime = 0L; + long newDrExpireTime = -1L; // Explicit DR expire time which possibly will be sent to DHT node. + + synchronized (this) { + boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); + + checkObsolete(); + + // Load and remove from swap if it is new. + if (isNew()) + unswap(true, retval); + + Object transformClo = null; + + if (drResolve) { + drRes = cctx.dr().resolveAtomic(this, + op, + writeObj, + valBytes, + expiryPlc, + drTtl, + drExpireTime, + drVer); + + if (drRes != null) { + if (drRes.isUseOld()) { + old = retval ? rawGetOrUnmarshalUnlocked(false) : val; + + return new GridCacheUpdateAtomicResult<>(false, + old, + null, + invokeRes, + -1L, + -1L, + null, + null, + false); + } + + newTtl = drRes.newTtl(); + + newExpireTime = drRes.newExpireTime(); + + newDrExpireTime = drRes.newDrExpireTime(); + + op = drRes.operation(); + + writeObj = drRes.value(); + + valBytes = drRes.valueBytes(); + + if (drRes.isMerge()) + drVer = null; // Update will be considered as local. + } + else + drVer = null; + } + + if (drRes == null) { // Perform version check only in case there will be no explicit conflict resolution. + if (verCheck) { + if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer) >= 0) { + if (ATOMIC_VER_COMPARATOR.compare(ver, newVer) == 0 && cctx.writeThrough() && primary) { + if (log.isDebugEnabled()) + log.debug("Received entry update with same version as current (will update store) " + + "[entry=" + this + ", newVer=" + newVer + ']'); + + V val = rawGetOrUnmarshalUnlocked(false); + + if (val == null) { + assert deletedUnlocked(); + + cctx.store().removeFromStore(null, key()); + } + else + cctx.store().putToStore(null, key(), val, ver); + } + else { + if (log.isDebugEnabled()) + log.debug("Received entry update with smaller version than current (will ignore) " + + "[entry=" + this + ", newVer=" + newVer + ']'); + } + + old = retval ? rawGetOrUnmarshalUnlocked(false) : val; + + return new GridCacheUpdateAtomicResult<>(false, + old, + null, + invokeRes, + -1L, + -1L, + null, + null, + false); + } + } + else + assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer) <= 0 : + "Invalid version for inner update [entry=" + this + ", newVer=" + newVer + ']'; + } + + // Possibly get old value form store. + old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; + + GridCacheValueBytes oldBytes = valueBytesUnlocked(); + + if (needVal && old == null && (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { + old = readThrough(null, key, false, CU.<K, V>empty(), subjId, taskName); + + // Detach value before index update. + if (cctx.portableEnabled()) + old = (V)cctx.kernalContext().portable().detachPortable(old); + + if (old != null) + updateIndex(old, null, expireTime(), ver, null); + else + clearIndex(null); + + update(old, null, 0, 0, ver); + + if (deletedUnlocked() && old != null && !isInternal()) + deletedUnlocked(false); + } + ++ // Apply metrics. ++ if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) { ++ // PutIfAbsent methods mustn't update hit/miss statistics ++ if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noPeekArray()) ++ cctx.cache().metrics0().onRead(old != null); ++ } ++ + // Check filter inside of synchronization. + if (!F.isEmptyOrNulls(filter)) { + boolean pass = cctx.isAll(wrapFilterLocked(), filter); + + if (!pass) { + if (hasValueUnlocked() && expiryPlc != null) { + newTtl = expiryPlc.forAccess(); + + if (newTtl != -1L) { + updateTtl(newTtl); + + expiryPlc.ttlUpdated(key, + getOrMarshalKeyBytes(), + version(), + hasReaders() ? ((GridDhtCacheEntry) this).readers() : null); + } + } + + return new GridCacheUpdateAtomicResult<>(false, + retval ? old : null, + null, + invokeRes, + -1L, + -1L, + null, + null, + false); + } + } + - // Apply metrics. - if (metrics && needVal) - cctx.cache().metrics0().onRead(old != null); - + // Calculate new value. + if (op == GridCacheOperation.TRANSFORM) { + transformClo = writeObj; + + EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)writeObj; + + CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(key, old); + + try { + Object computed = entryProcessor.process(entry, invokeArgs); + + updated = cctx.unwrapTemporary(entry.getValue()); + + if (computed != null) + invokeRes = new CacheInvokeResult<>(cctx.unwrapTemporary(computed)); + + valBytes = null; + } + catch (Exception e) { + invokeRes = new CacheInvokeResult<>(e); + + updated = old; + + valBytes = oldBytes.getIfMarshaled(); + } + + if (!entry.modified()) { + return new GridCacheUpdateAtomicResult<>(false, + retval ? old : null, + null, + invokeRes, + -1L, + -1L, + null, + null, + false); + } + } + else + updated = (V)writeObj; + + op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE; + + assert op == GridCacheOperation.UPDATE || (op == GridCacheOperation.DELETE && updated == null); + + boolean hadVal = hasValueUnlocked(); + + // Incorporate DR version into new version if needed. + if (drVer != null && drVer != newVer) + newVer = new GridCacheVersionEx(newVer.topologyVersion(), + newVer.globalTime(), + newVer.order(), + newVer.nodeOrder(), + newVer.dataCenterId(), + drVer); + + IgniteBiTuple<Boolean, V> interceptRes = null; + + if (op == GridCacheOperation.UPDATE) { + if (intercept) { + V interceptorVal = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated); + + if (interceptorVal == null) + return new GridCacheUpdateAtomicResult<>(false, + retval ? old : null, + null, + invokeRes, + -1L, + -1L, + null, + null, + false); + else if (interceptorVal != updated) { + updated = cctx.unwrapTemporary(interceptorVal); + valBytes = null; + } + } + + long ttl0 = newTtl; + + if (drRes == null) { + // Calculate TTL and expire time for local update. + if (drTtl >= 0L) { + assert drExpireTime >= 0L : drExpireTime; + + ttl0 = drTtl; + newExpireTime = drExpireTime; + } + else { + assert drExpireTime == -1L : drExpireTime; + + if (expiryPlc != null) + newTtl = hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate(); + else + newTtl = -1L; + + if (newTtl == -1L) { + ttl0 = ttlExtras(); + newExpireTime = expireTimeExtras(); + } + else { + ttl0 = newTtl; + newExpireTime = toExpireTime(ttl0); + } + } + } + else if (newTtl == -1L) + ttl0 = ttlExtras(); + + // Try write-through. + if (writeThrough) + // Must persist inside synchronization in non-tx mode. + cctx.store().putToStore(null, key, updated, newVer); + + if (!hadVal) { + boolean new0 = isNew(); + + assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this + ", locNodeId=" + + cctx.localNodeId() + ']'; + + if (!new0 && !isInternal()) + deletedUnlocked(false); + } + else { + assert !deletedUnlocked() : "Invalid entry [entry=" + this + + ", locNodeId=" + cctx.localNodeId() + ']'; + + // Do not change size. + } + + if (cctx.portableEnabled()) + updated = (V)cctx.kernalContext().portable().detachPortable(updated); + + // Update index inside synchronization since it can be updated + // in load methods without actually holding entry lock. + updateIndex(updated, valBytes, newExpireTime, newVer, old); + + update(updated, valBytes, newExpireTime, ttl0, newVer); + + drReplicate(drType, updated, valBytes, newVer); + + recordNodeId(affNodeId); + + if (evt) { + V evtOld = null; + + if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + evtOld = cctx.unwrapTemporary(old); + + cctx.events().addEvent(partition(), key, evtNodeId, null, + newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, + evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName); + } + + if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { + if (evtOld == null) + evtOld = cctx.unwrapTemporary(old); + + cctx.events().addEvent(partition(), key, evtNodeId, null, + newVer, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld, + evtOld != null || hadVal, subjId, null, taskName); + } + } + } + else { + if (intercept) { + interceptRes = cctx.config().<K, V>getInterceptor().onBeforeRemove(key, old); + + if (cctx.cancelRemove(interceptRes)) + return new GridCacheUpdateAtomicResult<>(false, + cctx.<V>unwrapTemporary(interceptRes.get2()), + null, + invokeRes, + -1L, + -1L, + null, + null, + false); + } + + if (writeThrough) + // Must persist inside synchronization in non-tx mode. + cctx.store().removeFromStore(null, key); + + // Update index inside synchronization since it can be updated + // in load methods without actually holding entry lock. + clearIndex(old); + + if (hadVal) { + assert !deletedUnlocked(); + + if (!isInternal()) + deletedUnlocked(true); + } + else { + boolean new0 = isNew(); + + assert deletedUnlocked() || new0 || isInternal() : "Invalid entry [entry=" + this + ", locNodeId=" + + cctx.localNodeId() + ']'; + + if (new0) { + if (!isInternal()) + deletedUnlocked(true); + } + } + + enqueueVer = newVer; + + boolean hasValPtr = valPtr != 0; + + // Clear value on backup. Entry will be removed from cache when it got evicted from queue. + update(null, null, 0, 0, newVer); + + if (cctx.offheapTiered() && hasValPtr) { + boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); + + assert rmv; + } + + clearReaders(); + + recordNodeId(affNodeId); + + drReplicate(drType, null, null, newVer); + + if (evt) { + V evtOld = null; + + if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + evtOld = cctx.unwrapTemporary(old); + + cctx.events().addEvent(partition(), key, evtNodeId, null, + newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, + evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName); + } + + if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { + if (evtOld == null) + evtOld = cctx.unwrapTemporary(old); + + cctx.events().addEvent(partition(), key, evtNodeId, null, newVer, + EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal, + subjId, null, taskName); + } + } + + res = hadVal; + + // Do not propagate zeroed TTL and expire time. + newTtl = -1L; + newDrExpireTime = -1L; + } + - if (metrics) - cctx.cache().metrics0().onWrite(); ++ if (res) ++ updateMetrics(op, metrics); + + if (primary || cctx.isReplicated()) + cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); + + cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); + + if (intercept) { + if (op == GridCacheOperation.UPDATE) + cctx.config().getInterceptor().onAfterPut(key, val); + else + cctx.config().getInterceptor().onAfterRemove(key, old); + + if (interceptRes != null) + old = cctx.unwrapTemporary(interceptRes.get2()); + } + } + + if (log.isDebugEnabled()) + log.debug("Updated cache entry [val=" + val + ", old=" + old + ", entry=" + this + ']'); + + return new GridCacheUpdateAtomicResult<>(res, + old, + updated, + invokeRes, + newTtl, + newDrExpireTime, + enqueueVer, + drRes, + true); + } + + /** + * Perform DR if needed. + * + * @param drType DR type. + * @param val Value. + * @param valBytes Value bytes. + * @param ver Version. + * @throws IgniteCheckedException In case of exception. + */ + private void drReplicate(GridDrType drType, @Nullable V val, @Nullable byte[] valBytes, GridCacheVersion ver) + throws IgniteCheckedException { + if (cctx.isDrEnabled() && drType != DR_NONE && !isInternal()) + cctx.dr().replicate(key, keyBytes, val, valBytes, rawTtl(), rawExpireTime(), ver.drVersion(), drType); + } + + /** + * @return {@code true} if entry has readers. It makes sense only for dht entry. + * @throws GridCacheEntryRemovedException If removed. + */ + protected boolean hasReaders() throws GridCacheEntryRemovedException { + return false; + } + + /** + * + */ + protected void clearReaders() { + // No-op. + } + + /** + * @param nodeId Node ID to clear. + */ + protected void clearReader(UUID nodeId) throws GridCacheEntryRemovedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean clear(GridCacheVersion ver, boolean readers, + @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) throws IgniteCheckedException { + cctx.denyOnFlag(READ); + + boolean ret; + boolean rmv; + boolean marked; + + while (true) { + ret = false; + rmv = false; + marked = false; + + // For optimistic check. + GridCacheVersion startVer = null; + + if (!F.isEmptyOrNulls(filter)) { + synchronized (this) { + startVer = this.ver; + } + + if (!cctx.isAll(this, filter)) + return false; + } + + synchronized (this) { + if (startVer != null && !startVer.equals(this.ver)) + // Version has changed since filter checking. + continue; + + V val = saveValueForIndexUnlocked(); + + try { + if ((!hasReaders() || readers)) { + // markObsolete will clear the value. + if (!(marked = markObsolete0(ver, true))) { + if (log.isDebugEnabled()) + log.debug("Entry could not be marked obsolete (it is still used): " + this); + + break; + } + + clearReaders(); + } + else { + if (log.isDebugEnabled()) + log.debug("Entry could not be marked obsolete (it still has readers): " + this); + + break; + } + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry when clearing (will simply return): " + this); + + ret = true; + + break; + } + + if (log.isDebugEnabled()) + log.debug("Entry has been marked obsolete: " + this); + + clearIndex(val); + + releaseSwap(); + + ret = true; + rmv = true; + + break; + } + } + + if (marked) + onMarkedObsolete(); + + if (rmv) + cctx.cache().removeEntry(this); // Clear cache. + + return ret; + } + + /** {@inheritDoc} */ + @Override public synchronized GridCacheVersion obsoleteVersion() { + return obsoleteVersionExtras(); + } + + /** {@inheritDoc} */ + @Override public boolean markObsolete(GridCacheVersion ver) { + boolean obsolete; + + synchronized (this) { + obsolete = markObsolete0(ver, true); + } + + if (obsolete) + onMarkedObsolete(); + + return obsolete; + } + + /** {@inheritDoc} */ + @Override public boolean markObsoleteIfEmpty(@Nullable GridCacheVersion ver) throws IgniteCheckedException { + boolean obsolete = false; + boolean deferred = false; + + try { + synchronized (this) { + if (obsoleteVersionExtras() != null) + return false; + + if (!hasValueUnlocked() || checkExpired()) { + if (ver == null) + ver = nextVersion(); + + if (cctx.deferredDelete() && !isStartVersion() && !detached() && !isInternal()) { + if (!deletedUnlocked()) { + update(null, null, 0L, 0L, ver); + + deletedUnlocked(true); + + deferred = true; + } + } + else + obsolete = markObsolete0(ver, true); + } + } + } + finally { + if (obsolete) + onMarkedObsolete(); + + if (deferred) + cctx.onDeferredDelete(this, ver); + } + + return obsolete; + } + + /** {@inheritDoc} */ + @Override public boolean markObsoleteVersion(GridCacheVersion ver) { + assert cctx.deferredDelete(); + + boolean marked; + + synchronized (this) { + if (obsoleteVersionExtras() != null) + return true; + + if (!this.ver.equals(ver)) + return false; + + marked = markObsolete0(ver, true); + } + + if (marked) + onMarkedObsolete(); + + return marked; + } + + /** + * <p> + * Note that {@link #onMarkedObsolete()} should always be called after this method + * returns {@code true}. + * + * @param ver Version. + * @param clear {@code True} to clear. + * @return {@code True} if entry is obsolete, {@code false} if entry is still used by other threads or nodes. + */ + protected final boolean markObsolete0(GridCacheVersion ver, boolean clear) { + assert Thread.holdsLock(this); + + GridCacheVersion obsoleteVer = obsoleteVersionExtras(); + + if (ver != null) { + // If already obsolete, then do nothing. + if (obsoleteVer != null) + return true; + + GridCacheMvcc<K> mvcc = mvccExtras(); + + if (mvcc == null || mvcc.isEmpty(ver)) { + obsoleteVer = ver; + + obsoleteVersionExtras(obsoleteVer); + + if (clear) + value(null, null); + } + + return obsoleteVer != null; + } + else + return obsoleteVer != null; + } + + /** {@inheritDoc} */ + @Override public void onMarkedObsolete() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public final synchronized boolean obsolete() { + return obsoleteVersionExtras() != null; + } + + /** {@inheritDoc} */ + @Override public final synchronized boolean obsolete(GridCacheVersion exclude) { + GridCacheVersion obsoleteVer = obsoleteVersionExtras(); + + return obsoleteVer != null && !obsoleteVer.equals(exclude); + } + + /** {@inheritDoc} */ + @Override public synchronized boolean invalidate(@Nullable GridCacheVersion curVer, GridCacheVersion newVer) + throws IgniteCheckedException { + assert newVer != null; + + if (curVer == null || ver.equals(curVer)) { + V val = saveValueForIndexUnlocked(); + + value(null, null); + + ver = newVer; + + releaseSwap(); + + clearIndex(val); + + onInvalidate(); + } + + return obsoleteVersionExtras() != null; + } + + /** + * Called when entry invalidated. + */ + protected void onInvalidate() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean invalidate(@Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) + throws GridCacheEntryRemovedException, IgniteCheckedException { + if (F.isEmptyOrNulls(filter)) { + synchronized (this) { + checkObsolete(); + + invalidate(null, nextVersion()); + + return true; + } + } + else { + // For optimistic checking. + GridCacheVersion startVer; + + synchronized (this){ + checkObsolete(); + + startVer = ver; + } + + if (!cctx.isAll(this, filter)) + return false; + + synchronized (this) { + checkObsolete(); + + if (startVer.equals(ver)) { + invalidate(null, nextVersion()); + + return true; + } + } + + // If version has changed then repeat the process. + return invalidate(filter); + } + } + + /** {@inheritDoc} */ + @Override public boolean compact(@Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) + throws GridCacheEntryRemovedException, IgniteCheckedException { + // For optim
<TRUNCATED>