http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java deleted file mode 100644 index c27287b..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ /dev/null @@ -1,760 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Replicated cache entry. - */ -@SuppressWarnings({"TooBroadScope", "NonPrivateFieldAccessedInSynchronizedContext"}) -public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Size overhead. */ - private static final int DHT_SIZE_OVERHEAD = 16; - - /** Gets node value from reader ID. */ - private static final IgniteClosure<ReaderId, UUID> R2N = new C1<ReaderId, UUID>() { - @Override public UUID apply(ReaderId e) { - return e.nodeId(); - } - }; - - /** Reader clients. */ - @GridToStringInclude - private volatile List<ReaderId<K, V>> rdrs = Collections.emptyList(); - - /** Local partition. */ - private final GridDhtLocalPartition<K, V> locPart; - - /** - * @param ctx Cache context. - * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed). - * @param key Cache key. - * @param hash Key hash value. - * @param val Entry value. - * @param next Next entry in the linked list. - * @param ttl Time to live. - * @param hdrId Header id. - */ - public GridDhtCacheEntry(GridCacheContext<K, V> ctx, long topVer, K key, int hash, V val, - GridCacheMapEntry<K, V> next, long ttl, int hdrId) { - super(ctx, key, hash, val, next, ttl, hdrId); - - // Record this entry with partition. - locPart = ctx.dht().topology().onAdded(topVer, this); - } - - /** {@inheritDoc} */ - @Override public int memorySize() throws IgniteCheckedException { - int rdrsOverhead = 0; - - synchronized (this) { - if (rdrs != null) - rdrsOverhead += ReaderId.READER_ID_SIZE * rdrs.size(); - } - - return super.memorySize() + DHT_SIZE_OVERHEAD + rdrsOverhead; - } - - /** {@inheritDoc} */ - @Override public int partition() { - return locPart.id(); - } - - /** {@inheritDoc} */ - @Override public boolean isDht() { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean partitionValid() { - return locPart.valid(); - } - - /** {@inheritDoc} */ - @Override public void onMarkedObsolete() { - assert !Thread.holdsLock(this); - - // Remove this entry from partition mapping. - cctx.dht().topology().onRemoved(this); - } - - /** - * @param nearVer Near version. - * @param rmv If {@code true}, then add to removed list if not found. - * @return Local candidate by near version. - * @throws GridCacheEntryRemovedException If removed. - */ - @Nullable public synchronized GridCacheMvccCandidate<K> localCandidateByNearVersion(GridCacheVersion nearVer, - boolean rmv) throws GridCacheEntryRemovedException { - checkObsolete(); - - GridCacheMvcc<K> mvcc = mvccExtras(); - - if (mvcc != null) { - for (GridCacheMvccCandidate<K> c : mvcc.localCandidatesNoCopy(false)) { - GridCacheVersion ver = c.otherVersion(); - - if (ver != null && ver.equals(nearVer)) - return c; - } - } - - if (rmv) - addRemoved(nearVer); - - return null; - } - - /** - * Add local candidate. - * - * @param nearNodeId Near node ID. - * @param nearVer Near version. - * @param topVer Topology version. - * @param threadId Owning thread ID. - * @param ver Lock version. - * @param timeout Timeout to acquire lock. - * @param reenter Reentry flag. - * @param tx Tx flag. - * @param implicitSingle Implicit flag. - * @return New candidate. - * @throws GridCacheEntryRemovedException If entry has been removed. - * @throws GridDistributedLockCancelledException If lock was cancelled. - */ - @Nullable public GridCacheMvccCandidate<K> addDhtLocal( - UUID nearNodeId, - GridCacheVersion nearVer, - long topVer, - long threadId, - GridCacheVersion ver, - long timeout, - boolean reenter, - boolean tx, - boolean implicitSingle) throws GridCacheEntryRemovedException, GridDistributedLockCancelledException { - GridCacheMvccCandidate<K> cand; - GridCacheMvccCandidate<K> prev; - GridCacheMvccCandidate<K> owner; - - V val; - - synchronized (this) { - // Check removed locks prior to obsolete flag. - checkRemoved(ver); - checkRemoved(nearVer); - - checkObsolete(); - - GridCacheMvcc<K> mvcc = mvccExtras(); - - if (mvcc == null) { - mvcc = new GridCacheMvcc<>(cctx); - - mvccExtras(mvcc); - } - - prev = mvcc.anyOwner(); - - boolean emptyBefore = mvcc.isEmpty(); - - cand = mvcc.addLocal( - this, - nearNodeId, - nearVer, - threadId, - ver, - timeout, - reenter, - tx, - implicitSingle, - /*dht-local*/true - ); - - if (cand == null) - return null; - - cand.topologyVersion(topVer); - - owner = mvcc.anyOwner(); - - if (owner != null) - cand.ownerVersion(owner.version()); - - boolean emptyAfter = mvcc.isEmpty(); - - checkCallbacks(emptyBefore, emptyAfter); - - val = this.val; - - if (mvcc != null && mvcc.isEmpty()) - mvccExtras(null); - } - - // Don't link reentries. - if (cand != null && !cand.reentry()) - // Link with other candidates in the same thread. - cctx.mvcc().addNext(cctx, cand); - - checkOwnerChanged(prev, owner, val); - - return cand; - } - - /** {@inheritDoc} */ - @Override public boolean tmLock(IgniteTxEx<K, V> tx, long timeout) - throws GridCacheEntryRemovedException, GridDistributedLockCancelledException { - if (tx.local()) { - GridDhtTxLocalAdapter<K, V> dhtTx = (GridDhtTxLocalAdapter<K, V>)tx; - - // Null is returned if timeout is negative and there is other lock owner. - return addDhtLocal( - dhtTx.nearNodeId(), - dhtTx.nearXidVersion(), - tx.topologyVersion(), - tx.threadId(), - tx.xidVersion(), - timeout, - /*reenter*/false, - /*tx*/true, - tx.implicitSingle()) != null; - } - - try { - addRemote( - tx.nodeId(), - tx.otherNodeId(), - tx.threadId(), - tx.xidVersion(), - tx.timeout(), - /*tx*/true, - tx.implicit(), - null); - - return true; - } - catch (GridDistributedLockCancelledException ignored) { - if (log.isDebugEnabled()) - log.debug("Attempted to enter tx lock for cancelled ID (will ignore): " + tx); - - return false; - } - } - - /** {@inheritDoc} */ - @Override public GridCacheMvccCandidate<K> removeLock() { - GridCacheMvccCandidate<K> ret = super.removeLock(); - - locPart.onUnlock(); - - return ret; - } - - /** {@inheritDoc} */ - @Override public boolean removeLock(GridCacheVersion ver) throws GridCacheEntryRemovedException { - boolean ret = super.removeLock(ver); - - locPart.onUnlock(); - - return ret; - } - - /** - * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition. - */ - public void onUnlock() { - locPart.onUnlock(); - } - - /** - * @param topVer Topology version. - * @return Tuple with version and value of this entry, or {@code null} if entry is new. - * @throws GridCacheEntryRemovedException If entry has been removed. - */ - @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext"}) - @Nullable public synchronized GridTuple3<GridCacheVersion, V, byte[]> versionedValue(long topVer) - throws GridCacheEntryRemovedException { - if (isNew() || !valid(-1) || deletedUnlocked()) - return null; - else { - V val0 = null; - byte[] valBytes0 = null; - - GridCacheValueBytes valBytesTuple = valueBytesUnlocked(); - - if (!valBytesTuple.isNull()) { - if (valBytesTuple.isPlain()) - val0 = (V)valBytesTuple.get(); - else - valBytes0 = valBytesTuple.get(); - } - else - val0 = val; - - return F.t(ver, val0, valBytes0); - } - } - - /** - * @return Readers. - * @throws GridCacheEntryRemovedException If removed. - */ - public Collection<UUID> readers() throws GridCacheEntryRemovedException { - return F.viewReadOnly(checkReaders(), R2N); - } - - /** - * @param nodeId Node ID. - * @return reader ID. - */ - @Nullable public ReaderId<K, V> readerId(UUID nodeId) { - for (ReaderId<K, V> reader : rdrs) - if (reader.nodeId().equals(nodeId)) - return reader; - - return null; - } - - /** - * @param nodeId Reader to add. - * @param msgId Message ID. - * @param topVer Topology version. - * @return Future for all relevant transactions that were active at the time of adding reader, - * or {@code null} if reader was added - * @throws GridCacheEntryRemovedException If entry was removed. - */ - @SuppressWarnings("unchecked") - @Nullable public IgniteFuture<Boolean> addReader(UUID nodeId, long msgId, long topVer) - throws GridCacheEntryRemovedException { - // Don't add local node as reader. - if (cctx.nodeId().equals(nodeId)) - return null; - - ClusterNode node = cctx.discovery().node(nodeId); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Ignoring near reader because node left the grid: " + nodeId); - - return null; - } - - // If remote node has no near cache, don't add it. - if (!U.hasNearCache(node, cacheName()) && !(key instanceof GridCacheInternal)) { - if (log.isDebugEnabled()) - log.debug("Ignoring near reader because near cache is disabled: " + nodeId); - - return null; - } - - // If remote node is (primary?) or back up, don't add it as a reader. - if (cctx.affinity().belongs(node, partition(), topVer)) { - if (log.isDebugEnabled()) - log.debug("Ignoring near reader because remote node is affinity node [locNodeId=" + cctx.localNodeId() - + ", rmtNodeId=" + nodeId + ", key=" + key + ']'); - - return null; - } - - boolean ret = false; - - GridCacheMultiTxFuture<K, V> txFut = null; - - Collection<GridCacheMvccCandidate<K>> cands = null; - - ReaderId<K, V> reader; - - synchronized (this) { - checkObsolete(); - - reader = readerId(nodeId); - - if (reader == null) { - reader = new ReaderId<>(nodeId, msgId); - - List<ReaderId<K, V>> rdrs = new ArrayList<>(this.rdrs.size() + 1); - - rdrs.addAll(this.rdrs); - rdrs.add(reader); - - // Seal. - this.rdrs = Collections.unmodifiableList(rdrs); - - // No transactions in ATOMIC cache. - if (!cctx.atomic()) { - txFut = reader.getOrCreateTxFuture(cctx); - - cands = localCandidates(); - - ret = true; - } - } - else { - txFut = reader.txFuture(); - - long id = reader.messageId(); - - if (id < msgId) - reader.messageId(msgId); - } - } - - if (ret) { - assert txFut != null; - - if (!F.isEmpty(cands)) { - for (GridCacheMvccCandidate<K> c : cands) { - IgniteTxEx<K, V> tx = cctx.tm().tx(c.version()); - - if (tx != null) { - assert tx.local(); - - txFut.addTx(tx); - } - } - } - - txFut.init(); - - if (!txFut.isDone()) { - final ReaderId<K, V> reader0 = reader; - - txFut.listenAsync(new CI1<IgniteFuture<?>>() { - @Override public void apply(IgniteFuture<?> f) { - synchronized (this) { - // Release memory. - reader0.resetTxFuture(); - } - } - }); - } - else { - synchronized (this) { - // Release memory. - reader.resetTxFuture(); - } - - txFut = null; - } - } - - return txFut; - } - - /** - * @param nodeId Reader to remove. - * @param msgId Message ID. - * @return {@code True} if reader was removed as a result of this operation. - * @throws GridCacheEntryRemovedException If entry was removed. - */ - public synchronized boolean removeReader(UUID nodeId, long msgId) throws GridCacheEntryRemovedException { - checkObsolete(); - - ReaderId reader = readerId(nodeId); - - if (reader == null || (reader.messageId() > msgId && msgId >= 0)) - return false; - - List<ReaderId<K, V>> rdrs = new ArrayList<>(this.rdrs.size()); - - for (ReaderId<K, V> rdr : this.rdrs) { - if (!rdr.equals(reader)) - rdrs.add(rdr); - } - - // Seal. - this.rdrs = rdrs.isEmpty() ? Collections.<ReaderId<K, V>>emptyList() : Collections.unmodifiableList(rdrs); - - return true; - } - - /** - * Clears all readers (usually when partition becomes invalid and ready for eviction). - */ - @Override public synchronized void clearReaders() { - rdrs = Collections.emptyList(); - } - - /** {@inheritDoc} */ - @Override public synchronized void clearReader(UUID nodeId) throws GridCacheEntryRemovedException { - removeReader(nodeId, -1); - } - - /** - * Marks entry as obsolete and, if possible or required, removes it - * from swap storage. - * - * @param ver Obsolete version. - * @param swap If {@code true} then remove from swap. - * @return {@code True} if entry was not being used, passed the filter and could be removed. - * @throws IgniteCheckedException If failed to remove from swap. - */ - public boolean clearInternal(GridCacheVersion ver, boolean swap) throws IgniteCheckedException { - boolean rmv = false; - - try { - synchronized (this) { - V prev = saveValueForIndexUnlocked(); - - // Call markObsolete0 to avoid recursive calls to clear if - // we are clearing dht local partition (onMarkedObsolete should not be called). - if (!markObsolete0(ver, false)) { - if (log.isDebugEnabled()) - log.debug("Entry could not be marked obsolete (it is still used or has readers): " + this); - - return false; - } - - rdrs = Collections.emptyList(); - - if (log.isDebugEnabled()) - log.debug("Entry has been marked obsolete: " + this); - - clearIndex(prev); - - // Give to GC. - update(null, null, 0L, 0L, ver); - - if (swap) { - releaseSwap(); - - if (log.isDebugEnabled()) - log.debug("Entry has been cleared from swap storage: " + this); - } - - rmv = true; - - return true; - } - } - finally { - if (rmv) - cctx.cache().removeIfObsolete(key); // Clear cache. - } - } - - /** - * @return Collection of readers after check. - * @throws GridCacheEntryRemovedException If removed. - */ - public synchronized Collection<ReaderId<K, V>> checkReaders() throws GridCacheEntryRemovedException { - checkObsolete(); - - if (!rdrs.isEmpty()) { - Collection<ReaderId> rmv = null; - - for (ReaderId reader : rdrs) { - if (!cctx.discovery().alive(reader.nodeId())) { - if (rmv == null) - rmv = new HashSet<>(); - - rmv.add(reader); - } - } - - if (rmv != null) { - List<ReaderId<K, V>> rdrs = new ArrayList<>(this.rdrs.size() - rmv.size()); - - for (ReaderId<K, V> rdr : this.rdrs) { - if (!rmv.contains(rdr)) - rdrs.add(rdr); - } - - // Seal. - this.rdrs = rdrs.isEmpty() ? Collections.<ReaderId<K, V>>emptyList() : - Collections.unmodifiableList(rdrs); - } - } - - return rdrs; - } - - /** {@inheritDoc} */ - @Override protected synchronized boolean hasReaders() throws GridCacheEntryRemovedException { - checkReaders(); - - return !rdrs.isEmpty(); - } - - /** - * Sets mappings into entry. - * - * @param ver Version. - * @param mappings Mappings to set. - * @return Candidate, if one existed for the version, or {@code null} if candidate was not found. - * @throws GridCacheEntryRemovedException If removed. - */ - @Nullable public synchronized GridCacheMvccCandidate<K> mappings(GridCacheVersion ver, Collection<UUID> mappings) - throws GridCacheEntryRemovedException { - checkObsolete(); - - GridCacheMvcc<K> mvcc = mvccExtras(); - - GridCacheMvccCandidate<K> cand = mvcc == null ? null : mvcc.candidate(ver); - - if (cand != null) - cand.mappedNodeIds(mappings); - - return cand; - } - - /** {@inheritDoc} */ - @Override public GridCacheEntry<K, V> wrap(boolean prjAware) { - GridCacheContext<K, V> nearCtx = cctx.dht().near().context(); - - GridCacheProjectionImpl<K, V> prjPerCall = nearCtx.projectionPerCall(); - - if (prjPerCall != null && prjAware) - return new GridPartitionedCacheEntryImpl<>(prjPerCall, nearCtx, key, this); - - return new GridPartitionedCacheEntryImpl<>(null, nearCtx, key, this); - } - - /** - * @return Cache name. - */ - protected String cacheName() { - return cctx.dht().near().name(); - } - - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return S.toString(GridDhtCacheEntry.class, this, "super", super.toString()); - } - - /** - * Reader ID. - */ - private static class ReaderId<K, V> { - /** Reader ID size. */ - private static final int READER_ID_SIZE = 24; - - /** Node ID. */ - private UUID nodeId; - - /** Message ID. */ - private long msgId; - - /** Transaction future. */ - private GridCacheMultiTxFuture<K, V> txFut; - - /** - * @param nodeId Node ID. - * @param msgId Message ID. - */ - ReaderId(UUID nodeId, long msgId) { - this.nodeId = nodeId; - this.msgId = msgId; - } - - /** - * @return Node ID. - */ - UUID nodeId() { - return nodeId; - } - - /** - * @return Message ID. - */ - long messageId() { - return msgId; - } - - /** - * @param msgId Message ID. - */ - void messageId(long msgId) { - this.msgId = msgId; - } - - /** - * @param cctx Cache context. - * @return Transaction future. - */ - GridCacheMultiTxFuture<K, V> getOrCreateTxFuture(GridCacheContext<K, V> cctx) { - if (txFut == null) - txFut = new GridCacheMultiTxFuture<>(cctx); - - return txFut; - } - - /** - * @return Transaction future. - */ - GridCacheMultiTxFuture<K, V> txFuture() { - return txFut; - } - - /** - * Sets multi-transaction future to {@code null}. - * - * @return Previous transaction future. - */ - GridCacheMultiTxFuture<K, V> resetTxFuture() { - GridCacheMultiTxFuture<K, V> txFut = this.txFut; - - this.txFut = null; - - return txFut; - } - - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (!(o instanceof ReaderId)) - return false; - - ReaderId readerId = (ReaderId)o; - - return msgId == readerId.msgId && nodeId.equals(readerId.nodeId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = nodeId.hashCode(); - - res = 31 * res + (int)(msgId ^ (msgId >>> 32)); - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ReaderId.class, this); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntryImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntryImpl.java deleted file mode 100644 index c703365..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntryImpl.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.cache.GridCachePeekMode.*; -import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*; - -/** - * Colocated cache entry public API. - */ -public class GridDhtCacheEntryImpl<K, V> extends GridCacheEntryImpl<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDhtCacheEntryImpl() { - // No-op. - } - - /** - * @param nearPrj Parent projection or {@code null} if entry belongs to default cache. - * @param ctx Near cache context. - * @param key key. - * @param cached Cached entry (either from near or dht cache map). - */ - @SuppressWarnings({"TypeMayBeWeakened"}) - public GridDhtCacheEntryImpl(GridCacheProjectionImpl<K, V> nearPrj, GridCacheContext<K, V> ctx, K key, - @Nullable GridCacheEntryEx<K, V> cached) { - super(nearPrj, ctx, key, cached); - - assert !this.ctx.isDht() || !isNearEnabled(ctx); - } - - /** - * @return Dht cache. - */ - private GridDhtCacheAdapter<K, V> dht() { - return ctx.dht(); - } - - /** {@inheritDoc} */ - @Override public V peek(@Nullable Collection<GridCachePeekMode> modes) throws IgniteCheckedException { - if (!ctx.isNear() && modes.contains(NEAR_ONLY)) - return null; - - V val = null; - - if (!modes.contains(PARTITIONED_ONLY)) - val = super.peek(modes); - - if (val == null) - val = peekDht0(modes, CU.<K, V>empty()); - - return val; - } - - /** - * @param filter Filter. - * @return Peeked value. - */ - @Nullable private V peekDht(@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { - try { - return peekDht0(SMART, filter); - } - catch (IgniteCheckedException e) { - // Should never happen. - throw new IgniteException("Unable to perform entry peek() operation.", e); - } - } - - /** - * @param modes Peek modes. - * @param filter Optional entry filter. - * @return Peeked value. - * @throws IgniteCheckedException If failed. - */ - @Nullable private V peekDht0(@Nullable Collection<GridCachePeekMode> modes, - @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException { - if (F.isEmpty(modes)) - return peekDht0(SMART, filter); - - assert modes != null; - - for (GridCachePeekMode mode : modes) { - V val = peekDht0(mode, filter); - - if (val != null) - return val; - } - - return null; - } - - /** - * @param mode Peek mode. - * @param filter Optional entry filter. - * @return Peeked value. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings({"unchecked"}) - @Nullable private V peekDht0(@Nullable GridCachePeekMode mode, - @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException { - if (mode == null) - mode = SMART; - - while (true) { - GridCacheProjectionImpl<K, V> prjPerCall = proxy.gateProjection(); - - if (prjPerCall != null) - filter = ctx.vararg(F.and(ctx.vararg(proxy.predicate()), filter)); - - GridCacheProjectionImpl<K, V> prev = ctx.gate().enter(prjPerCall); - - try { - GridCacheEntryEx<K, V> entry = dht().peekEx(key); - - return entry == null ? null : ctx.cloneOnFlag(entry.peek(mode, filter)); - } - catch (GridCacheEntryRemovedException ignore) { - // No-op. - } - finally { - ctx.gate().leave(prev); - } - } - } - - /** {@inheritDoc} */ - @Override public boolean isLocked() { - // Check colocated explicit locks. - return ctx.mvcc().isLockedByThread(key, -1); - } - - /** {@inheritDoc} */ - @Override public boolean isLockedByThread() { - // Check colocated explicit locks. - return ctx.mvcc().isLockedByThread(key, Thread.currentThread().getId()); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtCacheEntryImpl.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java deleted file mode 100644 index 7de6d64..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.tostring.*; - -import java.io.*; -import java.util.*; - -/** - * Embedded DHT future. - */ -public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implements GridDhtFuture<A> { - /** */ - private static final long serialVersionUID = 0L; - - /** Retries. */ - @GridToStringInclude - private Collection<Integer> invalidParts; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDhtEmbeddedFuture() { - // No-op. - } - - /** - * @param ctx Context. - * @param embedded Embedded. - * @param c Closure. - */ - public GridDhtEmbeddedFuture(GridKernalContext ctx, IgniteFuture<B> embedded, IgniteBiClosure<B, Exception, A> c) { - super(ctx, embedded, c); - - invalidParts = Collections.emptyList(); - } - - /** - * @param embedded Future to embed. - * @param c Embedding closure. - * @param ctx Kernal context. - */ - public GridDhtEmbeddedFuture(IgniteFuture<B> embedded, - IgniteBiClosure<B, Exception, IgniteFuture<A>> c, GridKernalContext ctx) { - super(embedded, c, ctx); - - invalidParts = Collections.emptyList(); - } - - /** - * @param ctx Context. - * @param embedded Embedded. - * @param c Closure. - * @param invalidParts Retries. - */ - public GridDhtEmbeddedFuture(GridKernalContext ctx, IgniteFuture<B> embedded, IgniteBiClosure<B, Exception, A> c, - Collection<Integer> invalidParts) { - super(ctx, embedded, c); - - this.invalidParts = invalidParts; - } - - /** {@inheritDoc} */ - @Override public Collection<Integer> invalidPartitions() { - return invalidParts; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtEmbeddedFuture.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtFinishedFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtFinishedFuture.java deleted file mode 100644 index 386a012..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtFinishedFuture.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.future.*; - -import java.io.*; -import java.util.*; - -/** - * Finished DHT future. - */ -public class GridDhtFinishedFuture<T> extends GridFinishedFuture<T> implements GridDhtFuture<T> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridDhtFinishedFuture() { - // No-op. - } - - /** - * @param ctx Context. - * @param t Result. - */ - public GridDhtFinishedFuture(GridKernalContext ctx, T t) { - super(ctx, t); - } - - /** - * @param ctx Context. - * @param err Error. - */ - public GridDhtFinishedFuture(GridKernalContext ctx, Throwable err) { - super(ctx, err); - } - - /** {@inheritDoc} */ - @Override public Collection<Integer> invalidPartitions() { - return Collections.emptyList(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtFinishedFuture.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtFuture.java deleted file mode 100644 index 40a87ca..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtFuture.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.lang.*; - -import java.util.*; - -/** - * Keys to retry. - */ -public interface GridDhtFuture<T> extends IgniteFuture<T> { - /** - * Node that future should be able to provide keys to retry before - * it completes, so it's not necessary to wait till future is done - * to get retry keys. - * - * @return Keys to retry because this node is no longer a primary or backup. - */ - public Collection<Integer> invalidPartitions(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java deleted file mode 100644 index 8a1eb1b..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ /dev/null @@ -1,451 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * - */ -public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Collection<GridCacheEntryInfo<K, V>>> - implements GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Logger reference. */ - private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - - /** Message ID. */ - private long msgId; - - /** */ - private UUID reader; - - /** Reload flag. */ - private boolean reload; - - /** Read through flag. */ - private boolean readThrough; - - /** Context. */ - private GridCacheContext<K, V> cctx; - - /** Keys. */ - private LinkedHashMap<? extends K, Boolean> keys; - - /** Reserved partitions. */ - private Collection<GridDhtLocalPartition> parts = new GridLeanSet<>(5); - - /** Future ID. */ - private IgniteUuid futId; - - /** Version. */ - private GridCacheVersion ver; - - /** Topology version .*/ - private long topVer; - - /** Transaction. */ - private IgniteTxLocalEx<K, V> tx; - - /** Filters. */ - private IgnitePredicate<GridCacheEntry<K, V>>[] filters; - - /** Logger. */ - private IgniteLogger log; - - /** Retries because ownership changed. */ - private Collection<Integer> retries = new GridLeanSet<>(); - - /** Subject ID. */ - private UUID subjId; - - /** Task name. */ - private int taskNameHash; - - /** Whether to deserialize portable objects. */ - private boolean deserializePortable; - - /** Expiry policy. */ - private IgniteCacheExpiryPolicy expiryPlc; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDhtGetFuture() { - // No-op. - } - - /** - * @param cctx Context. - * @param msgId Message ID. - * @param reader Reader. - * @param keys Keys. - * @param readThrough Read through flag. - * @param reload Reload flag. - * @param tx Transaction. - * @param topVer Topology version. - * @param filters Filters. - * @param subjId Subject ID. - * @param taskNameHash Task name hash code. - * @param deserializePortable Deserialize portable flag. - * @param expiryPlc Expiry policy. - */ - public GridDhtGetFuture( - GridCacheContext<K, V> cctx, - long msgId, - UUID reader, - LinkedHashMap<? extends K, Boolean> keys, - boolean readThrough, - boolean reload, - @Nullable IgniteTxLocalEx<K, V> tx, - long topVer, - @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filters, - @Nullable UUID subjId, - int taskNameHash, - boolean deserializePortable, - @Nullable IgniteCacheExpiryPolicy expiryPlc) { - super(cctx.kernalContext(), CU.<GridCacheEntryInfo<K, V>>collectionsReducer()); - - assert reader != null; - assert !F.isEmpty(keys); - - this.reader = reader; - this.cctx = cctx; - this.msgId = msgId; - this.keys = keys; - this.readThrough = readThrough; - this.reload = reload; - this.filters = filters; - this.tx = tx; - this.topVer = topVer; - this.subjId = subjId; - this.deserializePortable = deserializePortable; - this.taskNameHash = taskNameHash; - this.expiryPlc = expiryPlc; - - futId = IgniteUuid.randomUuid(); - - ver = tx == null ? cctx.versions().next() : tx.xidVersion(); - - log = U.logger(ctx, logRef, GridDhtGetFuture.class); - - syncNotify(true); - } - - /** - * Initializes future. - */ - void init() { - map(keys); - - markInitialized(); - } - - /** - * @return Keys. - */ - Collection<? extends K> keys() { - return keys.keySet(); - } - - /** {@inheritDoc} */ - @Override public Collection<Integer> invalidPartitions() { - return retries; - } - - /** - * @return Future ID. - */ - public IgniteUuid futureId() { - return futId; - } - - /** - * @return Future version. - */ - public GridCacheVersion version() { - return ver; - } - - /** {@inheritDoc} */ - @Override public boolean onDone(Collection<GridCacheEntryInfo<K, V>> res, Throwable err) { - if (super.onDone(res, err)) { - // Release all partitions reserved by this future. - for (GridDhtLocalPartition part : parts) - part.release(); - - return true; - } - - return false; - } - - /** - * @param keys Keys. - */ - private void map(final LinkedHashMap<? extends K, Boolean> keys) { - GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer); - - if (!F.isEmpty(fut.invalidPartitions())) - retries.addAll(fut.invalidPartitions()); - - add(new GridEmbeddedFuture<>(cctx.kernalContext(), fut, - new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo<K, V>>>() { - @Override public Collection<GridCacheEntryInfo<K, V>> apply(Object o, Exception e) { - if (e != null) { // Check error first. - if (log.isDebugEnabled()) - log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']'); - - onDone(e); - } - - LinkedHashMap<K, Boolean> mappedKeys = U.newLinkedHashMap(keys.size()); - - // Assign keys to primary nodes. - for (Map.Entry<? extends K, Boolean> key : keys.entrySet()) { - int part = cctx.affinity().partition(key.getKey()); - - if (!retries.contains(part)) { - if (!map(key.getKey(), parts)) - retries.add(part); - else - mappedKeys.put(key.getKey(), key.getValue()); - } - } - - // Add new future. - add(getAsync(mappedKeys)); - - // Finish this one. - return Collections.emptyList(); - } - }) - ); - } - - /** - * @param key Key. - * @param parts Parts to map. - * @return {@code True} if mapped. - */ - private boolean map(K key, Collection<GridDhtLocalPartition> parts) { - GridDhtLocalPartition part = topVer > 0 ? - cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) : - cache().topology().localPartition(key, false); - - if (part == null) - return false; - - if (!parts.contains(part)) { - // By reserving, we make sure that partition won't be unloaded while processed. - if (part.reserve()) { - parts.add(part); - - return true; - } - else - return false; - } - else - return true; - } - - /** - * @param keys Keys to get. - * @return Future for local get. - */ - @SuppressWarnings( {"unchecked", "IfMayBeConditional"}) - private IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> getAsync(final LinkedHashMap<? extends K, Boolean> keys) { - if (F.isEmpty(keys)) - return new GridFinishedFuture<Collection<GridCacheEntryInfo<K, V>>>(cctx.kernalContext(), - Collections.<GridCacheEntryInfo<K, V>>emptyList()); - - final Collection<GridCacheEntryInfo<K, V>> infos = new LinkedList<>(); - - String taskName0 = ctx.job().currentTaskName(); - - if (taskName0 == null) - taskName0 = ctx.task().resolveTaskName(taskNameHash); - - final String taskName = taskName0; - - GridCompoundFuture<Boolean, Boolean> txFut = null; - - for (Map.Entry<? extends K, Boolean> k : keys.entrySet()) { - while (true) { - GridDhtCacheEntry<K, V> e = cache().entryExx(k.getKey(), topVer); - - try { - GridCacheEntryInfo<K, V> info = e.info(); - - // If entry is obsolete. - if (info == null) - continue; - - // Register reader. If there are active transactions for this entry, - // then will wait for their completion before proceeding. - // TODO: GG-4003: - // TODO: What if any transaction we wait for actually removes this entry? - // TODO: In this case seems like we will be stuck with untracked near entry. - // TODO: To fix, check that reader is contained in the list of readers once - // TODO: again after the returned future completes - if not, try again. - // TODO: Also, why is info read before transactions are complete, and not after? - IgniteFuture<Boolean> f = (!e.deleted() && k.getValue()) ? e.addReader(reader, msgId, topVer) : null; - - if (f != null) { - if (txFut == null) - txFut = new GridCompoundFuture<>(cctx.kernalContext(), CU.boolReducer()); - - txFut.add(f); - } - - infos.add(info); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry when getting a DHT value: " + e); - } - finally { - cctx.evicts().touch(e, topVer); - } - } - } - - if (txFut != null) - txFut.markInitialized(); - - IgniteFuture<Map<K, V>> fut; - - if (txFut == null || txFut.isDone()) { - if (reload && cctx.readThrough() && cctx.store().configured()) { - fut = cache().reloadAllAsync(keys.keySet(), - true, - subjId, - taskName, - filters); - } - else { - if (tx == null) { - fut = cache().getDhtAllAsync(keys.keySet(), - readThrough, - subjId, - taskName, - deserializePortable, - filters, - expiryPlc); - } - else { - fut = tx.getAllAsync(cctx, - keys.keySet(), - null, - deserializePortable, - filters); - } - } - } - else { - // If we are here, then there were active transactions for some entries - // when we were adding the reader. In that case we must wait for those - // transactions to complete. - fut = new GridEmbeddedFuture<>( - txFut, - new C2<Boolean, Exception, IgniteFuture<Map<K, V>>>() { - @Override public IgniteFuture<Map<K, V>> apply(Boolean b, Exception e) { - if (e != null) - throw new GridClosureException(e); - - if (reload && cctx.readThrough() && cctx.store().configured()) { - return cache().reloadAllAsync(keys.keySet(), - true, - subjId, - taskName, - filters); - } - else { - if (tx == null) { - return cache().getDhtAllAsync(keys.keySet(), - readThrough, - subjId, - taskName, - deserializePortable, - filters, - expiryPlc); - } - else { - return tx.getAllAsync(cctx, - keys.keySet(), - null, - deserializePortable, - filters); - } - } - } - }, - cctx.kernalContext()); - } - - return new GridEmbeddedFuture<>(cctx.kernalContext(), fut, - new C2<Map<K, V>, Exception, Collection<GridCacheEntryInfo<K, V>>>() { - @Override public Collection<GridCacheEntryInfo<K, V>> apply(Map<K, V> map, Exception e) { - if (e != null) { - onDone(e); - - return Collections.emptyList(); - } - else { - for (Iterator<GridCacheEntryInfo<K, V>> it = infos.iterator(); it.hasNext();) { - GridCacheEntryInfo<K, V> info = it.next(); - - V v = map.get(info.key()); - - if (v == null) - it.remove(); - else - info.value(v); - } - - return infos; - } - } - }); - } - - /** - * @return DHT cache. - */ - private GridDhtCacheAdapter<K, V> cache() { - return (GridDhtCacheAdapter<K, V>)cctx.cache(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtInvalidPartitionException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtInvalidPartitionException.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtInvalidPartitionException.java deleted file mode 100644 index b4b6c01..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtInvalidPartitionException.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -/** - * Exception thrown whenever entry is created for invalid partition. - */ -public class GridDhtInvalidPartitionException extends RuntimeException { - /** */ - private static final long serialVersionUID = 0L; - - /** Partition. */ - private final int part; - - /** - * @param part Partition. - * @param msg Message. - */ - public GridDhtInvalidPartitionException(int part, String msg) { - super(msg); - - this.part = part; - } - - /** - * @return Partition. - */ - public int partition() { - return part; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return getClass() + " [part=" + part + ", msg=" + getMessage() + ']'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLocalPartition.java deleted file mode 100644 index ef2a71b..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ /dev/null @@ -1,594 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -import static org.apache.ignite.IgniteSystemProperties.*; -import static org.apache.ignite.events.IgniteEventType.*; -import static org.gridgain.grid.kernal.processors.cache.distributed.dht.GridDhtPartitionState.*; - -/** - * Key partition. - */ -public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalPartition> { - /** Maximum size for delete queue. */ - private static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(GG_ATOMIC_CACHE_DELETE_HISTORY_SIZE, - 200_000); - - /** Static logger to avoid re-creation. */ - private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - - /** Logger. */ - private static volatile IgniteLogger log; - - /** Partition ID. */ - private final int id; - - /** State. */ - @GridToStringExclude - private AtomicStampedReference<GridDhtPartitionState> state = - new AtomicStampedReference<>(MOVING, 0); - - /** Rent future. */ - @GridToStringExclude - private final GridFutureAdapter<?> rent; - - /** Entries map. */ - private final ConcurrentMap<K, GridDhtCacheEntry<K, V>> map; - - /** Context. */ - private final GridCacheContext<K, V> cctx; - - /** Create time. */ - @GridToStringExclude - private final long createTime = U.currentTimeMillis(); - - /** Eviction history. */ - private volatile Map<K, GridCacheVersion> evictHist = new HashMap<>(); - - /** Lock. */ - private final ReentrantLock lock = new ReentrantLock(); - - /** Public size counter. */ - private final LongAdder mapPubSize = new LongAdder(); - - /** Remove queue. */ - private GridCircularBuffer<T2<K, GridCacheVersion>> rmvQueue; - - /** - * @param cctx Context. - * @param id Partition ID. - */ - @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") - GridDhtLocalPartition(GridCacheContext<K, V> cctx, int id) { - assert cctx != null; - - this.id = id; - this.cctx = cctx; - - log = U.logger(cctx.kernalContext(), logRef, this); - - rent = new GridFutureAdapter<Object>(cctx.kernalContext()) { - @Override public String toString() { - return "PartitionRentFuture [part=" + GridDhtLocalPartition.this + ", map=" + map + ']'; - } - }; - - map = new ConcurrentHashMap8<>(cctx.config().getStartSize() / - cctx.affinity().partitions()); - - int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 : - Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20); - - rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize)); - } - - /** - * @return Partition ID. - */ - public int id() { - return id; - } - - /** - * @return Create time. - */ - long createTime() { - return createTime; - } - - /** - * @return Partition state. - */ - public GridDhtPartitionState state() { - return state.getReference(); - } - - /** - * @return Reservations. - */ - public int reservations() { - return state.getStamp(); - } - - /** - * @return Entries belonging to partition. - */ - public Collection<GridDhtCacheEntry<K, V>> entries() { - return map.values(); - } - - /** - * @return {@code True} if partition is empty. - */ - public boolean isEmpty() { - return map.isEmpty(); - } - - /** - * @return Number of entries in this partition (constant-time method). - */ - public int size() { - return map.size(); - } - - /** - * Increments public size of the map. - */ - public void incrementPublicSize() { - mapPubSize.increment(); - } - - /** - * Decrements public size of the map. - */ - public void decrementPublicSize() { - mapPubSize.decrement(); - } - - /** - * @return Number of public (non-internal) entries in this partition. - */ - public int publicSize() { - return mapPubSize.intValue(); - } - - /** - * @return If partition is moving or owning or renting. - */ - public boolean valid() { - GridDhtPartitionState state = state(); - - return state == MOVING || state == OWNING || state == RENTING; - } - - /** - * @param entry Entry to add. - */ - void onAdded(GridDhtCacheEntry<K, V> entry) { - GridDhtPartitionState state = state(); - - assert state != EVICTED : "Adding entry to invalid partition: " + this; - - map.put(entry.key(), entry); - - if (!entry.isInternal()) - mapPubSize.increment(); - } - - /** - * @param entry Entry to remove. - */ - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - void onRemoved(GridDhtCacheEntry<K, V> entry) { - assert entry.obsolete(); - - // Make sure to remove exactly this entry. - synchronized (entry) { - map.remove(entry.key(), entry); - - if (!entry.isInternal() && !entry.deleted()) - mapPubSize.decrement(); - } - - // Attempt to evict. - tryEvict(true); - } - - /** - * @param key Removed key. - * @param ver Removed version. - * @throws IgniteCheckedException If failed. - */ - public void onDeferredDelete(K key, GridCacheVersion ver) throws IgniteCheckedException { - try { - T2<K, GridCacheVersion> evicted = rmvQueue.add(new T2<>(key, ver)); - - if (evicted != null) - cctx.dht().removeVersionedEntry(evicted.get1(), evicted.get2()); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedException(e); - } - } - - /** - * Locks partition. - */ - @SuppressWarnings( {"LockAcquiredButNotSafelyReleased"}) - public void lock() { - lock.lock(); - } - - /** - * Unlocks partition. - */ - public void unlock() { - lock.unlock(); - } - - /** - * @param key Key. - * @param ver Version. - */ - public void onEntryEvicted(K key, GridCacheVersion ver) { - assert key != null; - assert ver != null; - assert lock.isHeldByCurrentThread(); // Only one thread can enter this method at a time. - - if (state() != MOVING) - return; - - Map<K, GridCacheVersion> evictHist0 = evictHist; - - if (evictHist0 != null ) { - GridCacheVersion ver0 = evictHist0.get(key); - - if (ver0 == null || ver0.isLess(ver)) { - GridCacheVersion ver1 = evictHist0.put(key, ver); - - assert ver1 == ver0; - } - } - } - - /** - * Cache preloader should call this method within partition lock. - * - * @param key Key. - * @param ver Version. - * @return {@code True} if preloading is permitted. - */ - public boolean preloadingPermitted(K key, GridCacheVersion ver) { - assert key != null; - assert ver != null; - assert lock.isHeldByCurrentThread(); // Only one thread can enter this method at a time. - - if (state() != MOVING) - return false; - - Map<K, GridCacheVersion> evictHist0 = evictHist; - - if (evictHist0 != null) { - GridCacheVersion ver0 = evictHist0.get(key); - - // Permit preloading if version in history - // is missing or less than passed in. - return ver0 == null || ver0.isLess(ver); - } - - return false; - } - - /** - * Reserves a partition so it won't be cleared. - * - * @return {@code True} if reserved. - */ - public boolean reserve() { - while (true) { - int reservations = state.getStamp(); - - GridDhtPartitionState s = state.getReference(); - - if (s == EVICTED) - return false; - - if (state.compareAndSet(s, s, reservations, reservations + 1)) - return true; - } - } - - /** - * Releases previously reserved partition. - */ - public void release() { - while (true) { - int reservations = state.getStamp(); - - if (reservations == 0) - return; - - GridDhtPartitionState s = state.getReference(); - - assert s != EVICTED; - - // Decrement reservations. - if (state.compareAndSet(s, s, reservations, --reservations)) { - tryEvict(true); - - break; - } - } - } - - /** - * @return {@code True} if transitioned to OWNING state. - */ - boolean own() { - while (true) { - int reservations = state.getStamp(); - - GridDhtPartitionState s = state.getReference(); - - if (s == RENTING || s == EVICTED) - return false; - - if (s == OWNING) - return true; - - assert s == MOVING; - - if (state.compareAndSet(MOVING, OWNING, reservations, reservations)) { - if (log.isDebugEnabled()) - log.debug("Owned partition: " + this); - - // No need to keep history any more. - evictHist = null; - - return true; - } - } - } - - /** - * @param updateSeq Update sequence. - * @return Future to signal that this node is no longer an owner or backup. - */ - IgniteFuture<?> rent(boolean updateSeq) { - while (true) { - int reservations = state.getStamp(); - - GridDhtPartitionState s = state.getReference(); - - if (s == RENTING || s == EVICTED) - return rent; - - if (state.compareAndSet(s, RENTING, reservations, reservations)) { - if (log.isDebugEnabled()) - log.debug("Moved partition to RENTING state: " + this); - - // Evict asynchronously, as the 'rent' method may be called - // from within write locks on local partition. - tryEvictAsync(updateSeq); - - break; - } - } - - return rent; - } - - /** - * @param updateSeq Update sequence. - * @return Future for evict attempt. - */ - private IgniteFuture<Boolean> tryEvictAsync(boolean updateSeq) { - if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) { - if (log.isDebugEnabled()) - log.debug("Evicted partition: " + this); - - clearSwap(); - - if (cctx.isDrEnabled()) - cctx.dr().partitionEvicted(id); - - cctx.dataStructures().onPartitionEvicted(id); - - rent.onDone(); - - ((GridDhtPreloader<K, V>)cctx.preloader()).onPartitionEvicted(this, updateSeq); - - clearDeferredDeletes(); - - return new GridFinishedFuture<>(cctx.kernalContext(), true); - } - - return cctx.closures().callLocalSafe(new GPC<Boolean>() { - @Override public Boolean call() { - return tryEvict(true); - } - }, /*system pool*/ true); - } - - /** - * @param updateSeq Update sequence. - * @return {@code True} if entry has been transitioned to state EVICTED. - */ - private boolean tryEvict(boolean updateSeq) { - // Attempt to evict partition entries from cache. - if (state.getReference() == RENTING && state.getStamp() == 0) - clearAll(); - - if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) { - if (log.isDebugEnabled()) - log.debug("Evicted partition: " + this); - - clearSwap(); - - if (cctx.isDrEnabled()) - cctx.dr().partitionEvicted(id); - - cctx.dataStructures().onPartitionEvicted(id); - - rent.onDone(); - - ((GridDhtPreloader<K, V>)cctx.preloader()).onPartitionEvicted(this, updateSeq); - - clearDeferredDeletes(); - - return true; - } - - return false; - } - - /** - * Clears swap entries for evicted partition. - */ - private void clearSwap() { - assert state() == EVICTED; - - try { - GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry<V>>> it = cctx.swap().iterator(id, false); - - if (it != null) { - // We can safely remove these values because no entries will be created for evicted partition. - while (it.hasNext()) { - Map.Entry<byte[], GridCacheSwapEntry<V>> entry = it.next(); - - byte[] keyBytes = entry.getKey(); - - K key = cctx.marshaller().unmarshal(keyBytes, cctx.deploy().globalLoader()); - - cctx.swap().remove(key, keyBytes); - } - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to clear swap for evicted partition: " + this, e); - } - } - - /** - * - */ - void onUnlock() { - tryEvict(true); - } - - /** - * @param topVer Topology version. - * @return {@code True} if local node is primary for this partition. - */ - public boolean primary(long topVer) { - return cctx.affinity().primary(cctx.localNode(), id, topVer); - } - - /** - * Clears values for this partition. - */ - private void clearAll() { - GridCacheVersion clearVer = cctx.versions().next(); - - boolean swap = cctx.isSwapOrOffheapEnabled(); - - boolean rec = cctx.events().isRecordable(EVT_CACHE_PRELOAD_OBJECT_UNLOADED); - - for (Iterator<GridDhtCacheEntry<K, V>> it = map.values().iterator(); it.hasNext();) { - GridDhtCacheEntry<K, V> cached = it.next(); - - try { - if (cached.clearInternal(clearVer, swap)) { - it.remove(); - - if (!cached.isInternal()) { - mapPubSize.decrement(); - - if (rec) - cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), (IgniteUuid)null, - null, EVT_CACHE_PRELOAD_OBJECT_UNLOADED, null, false, cached.rawGet(), - cached.hasValue(), null, null, null); - } - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e); - } - } - } - - /** - * - */ - private void clearDeferredDeletes() { - rmvQueue.forEach(new CI1<T2<K, GridCacheVersion>>() { - @Override public void apply(T2<K, GridCacheVersion> t) { - cctx.dht().removeVersionedEntry(t.get1(), t.get2()); - } - }); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return id; - } - - /** {@inheritDoc} */ - @SuppressWarnings( {"OverlyStrongTypeCast"}) - @Override public boolean equals(Object obj) { - return obj instanceof GridDhtLocalPartition && (obj == this || ((GridDhtLocalPartition)obj).id() == id); - } - - /** {@inheritDoc} */ - @Override public int compareTo(@NotNull GridDhtLocalPartition part) { - if (part == null) - return 1; - - return id == part.id() ? 0 : id > part.id() ? 1 : -1; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtLocalPartition.class, this, - "state", state(), - "reservations", reservations(), - "empty", map.isEmpty(), - "createTime", U.format(createTime), - "mapPubSize", mapPubSize); - } -}