http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtInvalidPartitionException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtInvalidPartitionException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtInvalidPartitionException.java new file mode 100644 index 0000000..2b5e6da --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtInvalidPartitionException.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +/** + * Exception thrown whenever entry is created for invalid partition. + */ +public class GridDhtInvalidPartitionException extends RuntimeException { + /** */ + private static final long serialVersionUID = 0L; + + /** Partition. */ + private final int part; + + /** + * @param part Partition. + * @param msg Message. + */ + public GridDhtInvalidPartitionException(int part, String msg) { + super(msg); + + this.part = part; + } + + /** + * @return Partition. + */ + public int partition() { + return part; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return getClass() + " [part=" + part + ", msg=" + getMessage() + ']'; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java new file mode 100644 index 0000000..a8f3125 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -0,0 +1,594 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; + +/** + * Key partition. + */ +public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalPartition> { + /** Maximum size for delete queue. */ + private static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(GG_ATOMIC_CACHE_DELETE_HISTORY_SIZE, + 200_000); + + /** Static logger to avoid re-creation. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + private static volatile IgniteLogger log; + + /** Partition ID. */ + private final int id; + + /** State. */ + @GridToStringExclude + private AtomicStampedReference<GridDhtPartitionState> state = + new AtomicStampedReference<>(MOVING, 0); + + /** Rent future. */ + @GridToStringExclude + private final GridFutureAdapter<?> rent; + + /** Entries map. */ + private final ConcurrentMap<K, GridDhtCacheEntry<K, V>> map; + + /** Context. */ + private final GridCacheContext<K, V> cctx; + + /** Create time. */ + @GridToStringExclude + private final long createTime = U.currentTimeMillis(); + + /** Eviction history. */ + private volatile Map<K, GridCacheVersion> evictHist = new HashMap<>(); + + /** Lock. */ + private final ReentrantLock lock = new ReentrantLock(); + + /** Public size counter. */ + private final LongAdder mapPubSize = new LongAdder(); + + /** Remove queue. */ + private GridCircularBuffer<T2<K, GridCacheVersion>> rmvQueue; + + /** + * @param cctx Context. + * @param id Partition ID. + */ + @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") + GridDhtLocalPartition(GridCacheContext<K, V> cctx, int id) { + assert cctx != null; + + this.id = id; + this.cctx = cctx; + + log = U.logger(cctx.kernalContext(), logRef, this); + + rent = new GridFutureAdapter<Object>(cctx.kernalContext()) { + @Override public String toString() { + return "PartitionRentFuture [part=" + GridDhtLocalPartition.this + ", map=" + map + ']'; + } + }; + + map = new ConcurrentHashMap8<>(cctx.config().getStartSize() / + cctx.affinity().partitions()); + + int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 : + Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20); + + rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize)); + } + + /** + * @return Partition ID. + */ + public int id() { + return id; + } + + /** + * @return Create time. + */ + long createTime() { + return createTime; + } + + /** + * @return Partition state. + */ + public GridDhtPartitionState state() { + return state.getReference(); + } + + /** + * @return Reservations. + */ + public int reservations() { + return state.getStamp(); + } + + /** + * @return Entries belonging to partition. + */ + public Collection<GridDhtCacheEntry<K, V>> entries() { + return map.values(); + } + + /** + * @return {@code True} if partition is empty. + */ + public boolean isEmpty() { + return map.isEmpty(); + } + + /** + * @return Number of entries in this partition (constant-time method). + */ + public int size() { + return map.size(); + } + + /** + * Increments public size of the map. + */ + public void incrementPublicSize() { + mapPubSize.increment(); + } + + /** + * Decrements public size of the map. + */ + public void decrementPublicSize() { + mapPubSize.decrement(); + } + + /** + * @return Number of public (non-internal) entries in this partition. + */ + public int publicSize() { + return mapPubSize.intValue(); + } + + /** + * @return If partition is moving or owning or renting. + */ + public boolean valid() { + GridDhtPartitionState state = state(); + + return state == MOVING || state == OWNING || state == RENTING; + } + + /** + * @param entry Entry to add. + */ + void onAdded(GridDhtCacheEntry<K, V> entry) { + GridDhtPartitionState state = state(); + + assert state != EVICTED : "Adding entry to invalid partition: " + this; + + map.put(entry.key(), entry); + + if (!entry.isInternal()) + mapPubSize.increment(); + } + + /** + * @param entry Entry to remove. + */ + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + void onRemoved(GridDhtCacheEntry<K, V> entry) { + assert entry.obsolete(); + + // Make sure to remove exactly this entry. + synchronized (entry) { + map.remove(entry.key(), entry); + + if (!entry.isInternal() && !entry.deleted()) + mapPubSize.decrement(); + } + + // Attempt to evict. + tryEvict(true); + } + + /** + * @param key Removed key. + * @param ver Removed version. + * @throws IgniteCheckedException If failed. + */ + public void onDeferredDelete(K key, GridCacheVersion ver) throws IgniteCheckedException { + try { + T2<K, GridCacheVersion> evicted = rmvQueue.add(new T2<>(key, ver)); + + if (evicted != null) + cctx.dht().removeVersionedEntry(evicted.get1(), evicted.get2()); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedException(e); + } + } + + /** + * Locks partition. + */ + @SuppressWarnings( {"LockAcquiredButNotSafelyReleased"}) + public void lock() { + lock.lock(); + } + + /** + * Unlocks partition. + */ + public void unlock() { + lock.unlock(); + } + + /** + * @param key Key. + * @param ver Version. + */ + public void onEntryEvicted(K key, GridCacheVersion ver) { + assert key != null; + assert ver != null; + assert lock.isHeldByCurrentThread(); // Only one thread can enter this method at a time. + + if (state() != MOVING) + return; + + Map<K, GridCacheVersion> evictHist0 = evictHist; + + if (evictHist0 != null ) { + GridCacheVersion ver0 = evictHist0.get(key); + + if (ver0 == null || ver0.isLess(ver)) { + GridCacheVersion ver1 = evictHist0.put(key, ver); + + assert ver1 == ver0; + } + } + } + + /** + * Cache preloader should call this method within partition lock. + * + * @param key Key. + * @param ver Version. + * @return {@code True} if preloading is permitted. + */ + public boolean preloadingPermitted(K key, GridCacheVersion ver) { + assert key != null; + assert ver != null; + assert lock.isHeldByCurrentThread(); // Only one thread can enter this method at a time. + + if (state() != MOVING) + return false; + + Map<K, GridCacheVersion> evictHist0 = evictHist; + + if (evictHist0 != null) { + GridCacheVersion ver0 = evictHist0.get(key); + + // Permit preloading if version in history + // is missing or less than passed in. + return ver0 == null || ver0.isLess(ver); + } + + return false; + } + + /** + * Reserves a partition so it won't be cleared. + * + * @return {@code True} if reserved. + */ + public boolean reserve() { + while (true) { + int reservations = state.getStamp(); + + GridDhtPartitionState s = state.getReference(); + + if (s == EVICTED) + return false; + + if (state.compareAndSet(s, s, reservations, reservations + 1)) + return true; + } + } + + /** + * Releases previously reserved partition. + */ + public void release() { + while (true) { + int reservations = state.getStamp(); + + if (reservations == 0) + return; + + GridDhtPartitionState s = state.getReference(); + + assert s != EVICTED; + + // Decrement reservations. + if (state.compareAndSet(s, s, reservations, --reservations)) { + tryEvict(true); + + break; + } + } + } + + /** + * @return {@code True} if transitioned to OWNING state. + */ + boolean own() { + while (true) { + int reservations = state.getStamp(); + + GridDhtPartitionState s = state.getReference(); + + if (s == RENTING || s == EVICTED) + return false; + + if (s == OWNING) + return true; + + assert s == MOVING; + + if (state.compareAndSet(MOVING, OWNING, reservations, reservations)) { + if (log.isDebugEnabled()) + log.debug("Owned partition: " + this); + + // No need to keep history any more. + evictHist = null; + + return true; + } + } + } + + /** + * @param updateSeq Update sequence. + * @return Future to signal that this node is no longer an owner or backup. + */ + IgniteFuture<?> rent(boolean updateSeq) { + while (true) { + int reservations = state.getStamp(); + + GridDhtPartitionState s = state.getReference(); + + if (s == RENTING || s == EVICTED) + return rent; + + if (state.compareAndSet(s, RENTING, reservations, reservations)) { + if (log.isDebugEnabled()) + log.debug("Moved partition to RENTING state: " + this); + + // Evict asynchronously, as the 'rent' method may be called + // from within write locks on local partition. + tryEvictAsync(updateSeq); + + break; + } + } + + return rent; + } + + /** + * @param updateSeq Update sequence. + * @return Future for evict attempt. + */ + private IgniteFuture<Boolean> tryEvictAsync(boolean updateSeq) { + if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) { + if (log.isDebugEnabled()) + log.debug("Evicted partition: " + this); + + clearSwap(); + + if (cctx.isDrEnabled()) + cctx.dr().partitionEvicted(id); + + cctx.dataStructures().onPartitionEvicted(id); + + rent.onDone(); + + ((GridDhtPreloader<K, V>)cctx.preloader()).onPartitionEvicted(this, updateSeq); + + clearDeferredDeletes(); + + return new GridFinishedFuture<>(cctx.kernalContext(), true); + } + + return cctx.closures().callLocalSafe(new GPC<Boolean>() { + @Override public Boolean call() { + return tryEvict(true); + } + }, /*system pool*/ true); + } + + /** + * @param updateSeq Update sequence. + * @return {@code True} if entry has been transitioned to state EVICTED. + */ + private boolean tryEvict(boolean updateSeq) { + // Attempt to evict partition entries from cache. + if (state.getReference() == RENTING && state.getStamp() == 0) + clearAll(); + + if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) { + if (log.isDebugEnabled()) + log.debug("Evicted partition: " + this); + + clearSwap(); + + if (cctx.isDrEnabled()) + cctx.dr().partitionEvicted(id); + + cctx.dataStructures().onPartitionEvicted(id); + + rent.onDone(); + + ((GridDhtPreloader<K, V>)cctx.preloader()).onPartitionEvicted(this, updateSeq); + + clearDeferredDeletes(); + + return true; + } + + return false; + } + + /** + * Clears swap entries for evicted partition. + */ + private void clearSwap() { + assert state() == EVICTED; + + try { + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry<V>>> it = cctx.swap().iterator(id, false); + + if (it != null) { + // We can safely remove these values because no entries will be created for evicted partition. + while (it.hasNext()) { + Map.Entry<byte[], GridCacheSwapEntry<V>> entry = it.next(); + + byte[] keyBytes = entry.getKey(); + + K key = cctx.marshaller().unmarshal(keyBytes, cctx.deploy().globalLoader()); + + cctx.swap().remove(key, keyBytes); + } + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to clear swap for evicted partition: " + this, e); + } + } + + /** + * + */ + void onUnlock() { + tryEvict(true); + } + + /** + * @param topVer Topology version. + * @return {@code True} if local node is primary for this partition. + */ + public boolean primary(long topVer) { + return cctx.affinity().primary(cctx.localNode(), id, topVer); + } + + /** + * Clears values for this partition. + */ + private void clearAll() { + GridCacheVersion clearVer = cctx.versions().next(); + + boolean swap = cctx.isSwapOrOffheapEnabled(); + + boolean rec = cctx.events().isRecordable(EVT_CACHE_PRELOAD_OBJECT_UNLOADED); + + for (Iterator<GridDhtCacheEntry<K, V>> it = map.values().iterator(); it.hasNext();) { + GridDhtCacheEntry<K, V> cached = it.next(); + + try { + if (cached.clearInternal(clearVer, swap)) { + it.remove(); + + if (!cached.isInternal()) { + mapPubSize.decrement(); + + if (rec) + cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), (IgniteUuid)null, + null, EVT_CACHE_PRELOAD_OBJECT_UNLOADED, null, false, cached.rawGet(), + cached.hasValue(), null, null, null); + } + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e); + } + } + } + + /** + * + */ + private void clearDeferredDeletes() { + rmvQueue.forEach(new CI1<T2<K, GridCacheVersion>>() { + @Override public void apply(T2<K, GridCacheVersion> t) { + cctx.dht().removeVersionedEntry(t.get1(), t.get2()); + } + }); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + + /** {@inheritDoc} */ + @SuppressWarnings( {"OverlyStrongTypeCast"}) + @Override public boolean equals(Object obj) { + return obj instanceof GridDhtLocalPartition && (obj == this || ((GridDhtLocalPartition)obj).id() == id); + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull GridDhtLocalPartition part) { + if (part == null) + return 1; + + return id == part.id() ? 0 : id > part.id() ? 1 : -1; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtLocalPartition.class, this, + "state", state(), + "reservations", reservations(), + "empty", map.isEmpty(), + "createTime", U.format(createTime), + "mapPubSize", mapPubSize); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java new file mode 100644 index 0000000..fe279d6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -0,0 +1,1235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.processors.timeout.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; +import static org.apache.ignite.internal.processors.dr.GridDrType.*; + +/** + * Cache lock future. + */ +public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> + implements GridCacheMvccFuture<K, V, Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion { + /** */ + private static final long serialVersionUID = 0L; + + /** Logger reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Cache registry. */ + @GridToStringExclude + private GridCacheContext<K, V> cctx; + + /** Near node ID. */ + private UUID nearNodeId; + + /** Near lock version. */ + private GridCacheVersion nearLockVer; + + /** Topology version. */ + private long topVer; + + /** Thread. */ + private long threadId; + + /** Keys locked so far. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + @GridToStringExclude + private List<GridDhtCacheEntry<K, V>> entries; + + /** Near mappings. */ + private Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> nearMap = + new ConcurrentHashMap8<>(); + + /** DHT mappings. */ + private Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> dhtMap = + new ConcurrentHashMap8<>(); + + /** Future ID. */ + private IgniteUuid futId; + + /** Lock version. */ + private GridCacheVersion lockVer; + + /** Read flag. */ + private boolean read; + + /** Error. */ + private AtomicReference<Throwable> err = new AtomicReference<>(null); + + /** Timed out flag. */ + private volatile boolean timedOut; + + /** Timeout object. */ + @GridToStringExclude + private LockTimeoutObject timeoutObj; + + /** Lock timeout. */ + private long timeout; + + /** Logger. */ + @GridToStringExclude + private IgniteLogger log; + + /** Filter. */ + private IgnitePredicate<GridCacheEntry<K, V>>[] filter; + + /** Transaction. */ + private GridDhtTxLocalAdapter<K, V> tx; + + /** All replies flag. */ + private AtomicBoolean mapped = new AtomicBoolean(false); + + /** */ + private Collection<Integer> invalidParts = new GridLeanSet<>(); + + /** Trackable flag. */ + private boolean trackable = true; + + /** Mutex. */ + private final Object mux = new Object(); + + /** Pending locks. */ + private final Collection<K> pendingLocks = new GridConcurrentHashSet<>(); + + /** TTL for read operation. */ + private long accessTtl; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridDhtLockFuture() { + // No-op. + } + + /** + * @param cctx Cache context. + * @param nearNodeId Near node ID. + * @param nearLockVer Near lock version. + * @param topVer Topology version. + * @param cnt Number of keys to lock. + * @param read Read flag. + * @param timeout Lock acquisition timeout. + * @param tx Transaction. + * @param threadId Thread ID. + * @param accessTtl TTL for read operation. + * @param filter Filter. + */ + public GridDhtLockFuture( + GridCacheContext<K, V> cctx, + UUID nearNodeId, + GridCacheVersion nearLockVer, + long topVer, + int cnt, + boolean read, + long timeout, + GridDhtTxLocalAdapter<K, V> tx, + long threadId, + long accessTtl, + IgnitePredicate<GridCacheEntry<K, V>>[] filter) { + super(cctx.kernalContext(), CU.boolReducer()); + + assert nearNodeId != null; + assert nearLockVer != null; + assert topVer > 0; + + this.cctx = cctx; + this.nearNodeId = nearNodeId; + this.nearLockVer = nearLockVer; + this.topVer = topVer; + this.read = read; + this.timeout = timeout; + this.filter = filter; + this.tx = tx; + this.accessTtl = accessTtl; + + if (tx != null) + tx.topologyVersion(topVer); + + assert tx == null || threadId == tx.threadId(); + + this.threadId = threadId; + + if (tx != null) + lockVer = tx.xidVersion(); + else { + lockVer = cctx.mvcc().mappedVersion(nearLockVer); + + if (lockVer == null) + lockVer = cctx.versions().onReceivedAndNext(nearNodeId, nearLockVer); + } + + futId = IgniteUuid.randomUuid(); + + entries = new ArrayList<>(cnt); + + log = U.logger(ctx, logRef, GridDhtLockFuture.class); + + if (timeout > 0) { + timeoutObj = new LockTimeoutObject(); + + cctx.time().addTimeoutObject(timeoutObj); + } + } + + /** {@inheritDoc} */ + @Override public Collection<Integer> invalidPartitions() { + return invalidParts; + } + + /** + * @param cacheCtx Cache context. + * @param invalidPart Partition to retry. + */ + void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int invalidPart) { + invalidParts.add(invalidPart); + + // Register invalid partitions with transaction. + if (tx != null) + tx.addInvalidPartition(cacheCtx, invalidPart); + + if (log.isDebugEnabled()) + log.debug("Added invalid partition to future [invalidPart=" + invalidPart + ", fut=" + this + ']'); + } + + /** + * @return Participating nodes. + */ + @Override public Collection<? extends ClusterNode> nodes() { + return F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) { + if (isMini(f)) + return ((MiniFuture)f).node(); + + return cctx.discovery().localNode(); + } + }); + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return lockVer; + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return trackable; + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + trackable = false; + } + + /** + * @return Entries. + */ + public Collection<GridDhtCacheEntry<K, V>> entries() { + return F.view(entries, F.notNull()); + } + + /** + * @return Entries. + */ + public Collection<GridDhtCacheEntry<K, V>> entriesCopy() { + synchronized (mux) { + return new ArrayList<>(entries()); + } + } + + /** + * @return Future ID. + */ + @Override public IgniteUuid futureId() { + return futId; + } + + /** + * @return Near lock version. + */ + public GridCacheVersion nearLockVersion() { + return nearLockVer; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCacheVersion mappedVersion() { + return tx == null ? nearLockVer : null; + } + + /** + * @return {@code True} if transaction is not {@code null}. + */ + private boolean inTx() { + return tx != null; + } + + /** + * @return {@code True} if transaction is implicit. + */ + private boolean implicitSingle() { + return tx != null && tx.implicitSingle(); + } + + /** + * @return {@code True} if transaction is not {@code null} and has invalidate flag set. + */ + private boolean isInvalidate() { + return tx != null && tx.isInvalidate(); + } + + /** + * @return Transaction isolation or {@code null} if no transaction. + */ + @Nullable private IgniteTxIsolation isolation() { + return tx == null ? null : tx.isolation(); + } + + /** + * @param cached Entry. + * @return {@code True} if locked. + * @throws GridCacheEntryRemovedException If removed. + */ + private boolean locked(GridCacheEntryEx<K, V> cached) throws GridCacheEntryRemovedException { + return (cached.lockedLocally(lockVer) && filter(cached)); // If filter failed, lock is failed. + } + + /** + * @param cached Entry. + * @param owner Lock owner. + * @return {@code True} if locked. + */ + private boolean locked(GridCacheEntryEx<K, V> cached, GridCacheMvccCandidate<K> owner) { + // Reentry-aware check (if filter failed, lock is failed). + return owner != null && owner.matches(lockVer, cctx.nodeId(), threadId) && filter(cached); + } + + /** + * Adds entry to future. + * + * @param entry Entry to add. + * @return Lock candidate. + * @throws GridCacheEntryRemovedException If entry was removed. + * @throws GridDistributedLockCancelledException If lock is canceled. + */ + @Nullable public GridCacheMvccCandidate<K> addEntry(GridDhtCacheEntry<K, V> entry) + throws GridCacheEntryRemovedException, GridDistributedLockCancelledException { + if (log.isDebugEnabled()) + log.debug("Adding entry: " + entry); + + if (entry == null) + return null; + + // Check if the future is timed out. + if (timedOut) + return null; + + // Add local lock first, as it may throw GridCacheEntryRemovedException. + GridCacheMvccCandidate<K> c = entry.addDhtLocal( + nearNodeId, + nearLockVer, + topVer, + threadId, + lockVer, + timeout, + /*reenter*/false, + inTx(), + implicitSingle() + ); + + if (c == null && timeout < 0) { + if (log.isDebugEnabled()) + log.debug("Failed to acquire lock with negative timeout: " + entry); + + onFailed(false); + + return null; + } + + synchronized (mux) { + entries.add(c == null || c.reentry() ? null : entry); + } + + if (c != null && !c.reentry()) + pendingLocks.add(entry.key()); + + // Double check if the future has already timed out. + if (timedOut) { + entry.removeLock(lockVer); + + return null; + } + + return c; + } + + /** + * Undoes all locks. + * + * @param dist If {@code true}, then remove locks from remote nodes as well. + */ + private void undoLocks(boolean dist) { + // Transactions will undo during rollback. + Collection<GridDhtCacheEntry<K, V>> entriesCp = entriesCopy(); + + if (dist && tx == null) { + cctx.dhtTx().removeLocks(nearNodeId, lockVer, F.viewReadOnly(entriesCp, + new C1<GridDhtCacheEntry<K, V>, K>() { + @Override public K apply(GridDhtCacheEntry<K, V> e) { + return e.key(); + } + }), false); + } + else { + if (tx != null) { + if (tx.setRollbackOnly()) { + if (log.isDebugEnabled()) + log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx); + } + else if (log.isDebugEnabled()) + log.debug("Transaction was not marked rollback-only while locks were not acquired: " + tx); + } + + for (GridCacheEntryEx<K, V> e : entriesCp) { + try { + e.removeLock(lockVer); + } + catch (GridCacheEntryRemovedException ignored) { + while (true) { + try { + e = cctx.cache().peekEx(e.key()); + + if (e != null) + e.removeLock(lockVer); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Attempted to remove lock on removed entry (will retry) [ver=" + + lockVer + ", entry=" + e + ']'); + } + } + } + } + } + } + + /** + * + * @param dist {@code True} if need to distribute lock release. + */ + private void onFailed(boolean dist) { + undoLocks(dist); + + onComplete(false); + } + + /** + * @param nodeId Left node ID + * @return {@code True} if node was in the list. + */ + @SuppressWarnings({"ThrowableInstanceNeverThrown"}) + @Override public boolean onNodeLeft(UUID nodeId) { + boolean found = false; + + for (IgniteFuture<?> fut : futures()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture)fut; + + if (f.node().id().equals(nodeId)) { + f.onResult(new ClusterTopologyException("Remote node left grid (will ignore): " + nodeId)); + + found = true; + } + } + } + + return found; + } + + /** + * @param nodeId Sender. + * @param res Result. + */ + void onResult(UUID nodeId, GridDhtLockResponse<K, V> res) { + if (!isDone()) { + if (log.isDebugEnabled()) + log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']'); + + boolean found = false; + + for (IgniteFuture<Boolean> fut : pending()) { + if (isMini(fut)) { + MiniFuture mini = (MiniFuture)fut; + + if (mini.futureId().equals(res.miniId())) { + assert mini.node().id().equals(nodeId); + + if (log.isDebugEnabled()) + log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']'); + + found = true; + + mini.onResult(res); + + if (log.isDebugEnabled()) + log.debug("Futures after processed lock response [fut=" + this + ", mini=" + mini + + ", res=" + res + ']'); + + break; + } + } + } + + if (!found) + U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res + + ", fut=" + this + ']'); + } + } + + /** + * Sets all local locks as ready. After local locks are acquired, lock requests will be sent to remote nodes. + * Thus, no reordering will occur for remote locks as they are added after local locks are acquired. + */ + private void readyLocks() { + if (log.isDebugEnabled()) + log.debug("Marking local locks as ready for DHT lock future: " + this); + + for (int i = 0; i < entries.size(); i++) { + while (true) { + GridDistributedCacheEntry<K, V> entry = entries.get(i); + + if (entry == null) + break; // While. + + try { + GridCacheMvccCandidate<K> owner = entry.readyLock(lockVer); + + if (timeout < 0) { + if (owner == null || !owner.version().equals(lockVer)) { + // We did not send any requests yet. + onFailed(false); + + return; + } + } + + if (log.isDebugEnabled()) { + if (!locked(entry, owner)) + log.debug("Entry is not locked (will keep waiting) [entry=" + entry + + ", fut=" + this + ']'); + } + + break; // Inner while loop. + } + // Possible in concurrent cases, when owner is changed after locks + // have been released or cancelled. + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to ready lock because entry was removed (will renew)."); + + entries.set(i, (GridDhtCacheEntry<K, V>)cctx.cache().entryEx(entry.key(), topVer)); + } + } + } + } + + /** + * @param e Error. + */ + public void onError(GridDistributedLockCancelledException e) { + if (err.compareAndSet(null, e)) + onComplete(false); + } + + /** + * @param t Error. + */ + public void onError(Throwable t) { + if (err.compareAndSet(null, t)) + onComplete(false); + } + + /** + * @param cached Entry to check. + * @return {@code True} if filter passed. + */ + private boolean filter(GridCacheEntryEx<K, V> cached) { + try { + if (!cctx.isAll(cached, filter)) { + if (log.isDebugEnabled()) + log.debug("Filter didn't pass for entry (will fail lock): " + cached); + + onFailed(true); + + return false; + } + + return true; + } + catch (IgniteCheckedException e) { + onError(e); + + return false; + } + } + + /** + * Callback for whenever entry lock ownership changes. + * + * @param entry Entry whose lock ownership changed. + */ + @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) { + if (isDone()) + return false; // Check other futures. + + if (log.isDebugEnabled()) + log.debug("Received onOwnerChanged() callback [entry=" + entry + ", owner=" + owner + "]"); + + if (owner != null && owner.version().equals(lockVer)) { + pendingLocks.remove(entry.key()); + + if (checkLocks()) + map(entries()); + + return true; + } + + return false; + } + + /** + * @return {@code True} if locks have been acquired. + */ + private boolean checkLocks() { + return pendingLocks.isEmpty(); + } + + /** {@inheritDoc} */ + @Override public boolean cancel() { + if (onCancelled()) + onComplete(false); + + return isCancelled(); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Boolean success, @Nullable Throwable err) { + // Protect against NPE. + if (success == null) { + assert err != null; + + success = false; + } + + assert err == null || !success; + assert !success || (initialized() && !hasPending()) : "Invalid done callback [success=" + success + + ", fut=" + this + ']'; + + if (log.isDebugEnabled()) + log.debug("Received onDone(..) callback [success=" + success + ", err=" + err + ", fut=" + this + ']'); + + // If locks were not acquired yet, delay completion. + if (isDone() || (err == null && success && !checkLocks())) + return false; + + this.err.compareAndSet(null, err); + + return onComplete(success); + } + + /** + * Completeness callback. + * + * @param success {@code True} if lock was acquired. + * @return {@code True} if complete by this operation. + */ + private boolean onComplete(boolean success) { + if (log.isDebugEnabled()) + log.debug("Received onComplete(..) callback [success=" + success + ", fut=" + this + ']'); + + if (!success) + undoLocks(true); + + if (tx != null) + cctx.tm().txContext(tx); + + if (super.onDone(success, err.get())) { + if (log.isDebugEnabled()) + log.debug("Completing future: " + this); + + // Clean up. + cctx.mvcc().removeFuture(this); + + if (timeoutObj != null) + cctx.time().removeTimeoutObject(timeoutObj); + + return true; + } + + return false; + } + + /** + * @param f Future. + * @return {@code True} if mini-future. + */ + private boolean isMini(IgniteFuture<?> f) { + return f.getClass().equals(MiniFuture.class); + } + + /** + * + */ + public void map() { + if (F.isEmpty(entries)) { + onComplete(true); + + return; + } + + readyLocks(); + } + + /** + * @param entries Entries. + */ + private void map(Iterable<GridDhtCacheEntry<K, V>> entries) { + if (!mapped.compareAndSet(false, true)) { + if (log.isDebugEnabled()) + log.debug("Will not map DHT lock future (other thread is mapping): " + this); + + return; + } + + try { + if (log.isDebugEnabled()) + log.debug("Mapping entry for DHT lock future: " + this); + + boolean hasRmtNodes = false; + + // Assign keys to primary nodes. + for (GridDhtCacheEntry<K, V> entry : entries) { + try { + while (true) { + try { + hasRmtNodes = cctx.dhtMap(nearNodeId, topVer, entry, log, dhtMap, nearMap); + + GridCacheMvccCandidate<K> cand = entry.mappings(lockVer, + F.nodeIds(F.concat(false, dhtMap.keySet(), nearMap.keySet()))); + + // Possible in case of lock cancellation. + if (cand == null) { + onFailed(false); + + // Will mark initialized in finally block. + return; + } + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry when mapping DHT lock future (will retry): " + entry); + + entry = cctx.dht().entryExx(entry.key(), topVer); + } + } + } + catch (GridDhtInvalidPartitionException e) { + assert false : "DHT lock should never get invalid partition [err=" + e + ", fut=" + this + ']'; + } + } + + if (tx != null) { + tx.addDhtMapping(dhtMap); + tx.addNearMapping(nearMap); + + tx.needsCompletedVersions(hasRmtNodes); + } + + if (isDone()) { + if (log.isDebugEnabled()) + log.debug("Mapping won't proceed because future is done: " + this); + + return; + } + + if (log.isDebugEnabled()) + log.debug("Mapped DHT lock future [dhtMap=" + F.nodeIds(dhtMap.keySet()) + ", nearMap=" + + F.nodeIds(nearMap.keySet()) + ", dhtLockFut=" + this + ']'); + + if (inTx() && tx.onePhaseCommit()) { + if (dhtMap.size() == 1 && nearMap.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("One-phase commit transaction mapped to single node (will send locks on commit): " + tx); + + // Will mark initialized in finally block. + return; + } + } + + // Create mini futures. + for (Map.Entry<ClusterNode, List<GridDhtCacheEntry<K, V>>> mapped : dhtMap.entrySet()) { + ClusterNode n = mapped.getKey(); + + List<GridDhtCacheEntry<K, V>> dhtMapping = mapped.getValue(); + + int cnt = F.size(dhtMapping); + + if (cnt > 0) { + assert !n.id().equals(ctx.localNodeId()); + + List<GridDhtCacheEntry<K, V>> nearMapping = nearMap.get(n); + + MiniFuture fut = new MiniFuture(n, dhtMapping, nearMapping); + + GridDhtLockRequest<K, V> req = new GridDhtLockRequest<>( + cctx.cacheId(), + nearNodeId, + inTx() ? tx.nearXidVersion() : null, + threadId, + futId, + fut.futureId(), + lockVer, + topVer, + inTx(), + read, + isolation(), + isInvalidate(), + timeout, + cnt, + F.size(nearMapping), + inTx() ? tx.size() : cnt, + inTx() ? tx.groupLockKey() : null, + inTx() && tx.partitionLock(), + inTx() ? tx.subjectId() : null, + inTx() ? tx.taskNameHash() : 0, + read ? accessTtl : -1L); + + try { + for (ListIterator<GridDhtCacheEntry<K, V>> it = dhtMapping.listIterator(); it.hasNext();) { + GridDhtCacheEntry<K, V> e = it.next(); + + // Must unswap entry so that isNewLocked returns correct value. + e.unswap(true, false); + + boolean invalidateRdr = e.readerId(n.id()) != null; + + IgniteTxEntry<K, V> entry = tx != null ? tx.entry(e.txKey()) : null; + + req.addDhtKey( + e.key(), + e.getOrMarshalKeyBytes(), + tx != null ? tx.writeMap().get(e.txKey()) : null, + entry != null ? entry.drVersion() : null, + invalidateRdr, + cctx); + + try { + if (e.isNewLocked()) + // Mark last added key as needed to be preloaded. + req.markLastKeyForPreload(); + } + catch (GridCacheEntryRemovedException ex) { + assert false : "Entry cannot become obsolete when DHT local candidate is added " + + "[e=" + e + ", ex=" + ex + ']'; + } + + it.set(addOwned(req, e)); + } + + add(fut); // Append new future. + + if (log.isDebugEnabled()) + log.debug("Sending DHT lock request to DHT node [node=" + n.id() + ", req=" + req + ']'); + + cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + // Fail the whole thing. + if (e instanceof ClusterTopologyException) + fut.onResult((ClusterTopologyException)e); + else + fut.onResult(e); + } + } + } + + for (Map.Entry<ClusterNode, List<GridDhtCacheEntry<K, V>>> mapped : nearMap.entrySet()) { + ClusterNode n = mapped.getKey(); + + List<GridDhtCacheEntry<K, V>> nearMapping = mapped.getValue(); + + int cnt = F.size(nearMapping); + + if (cnt > 0) { + MiniFuture fut = new MiniFuture(n, null, nearMapping); + + GridDhtLockRequest<K, V> req = new GridDhtLockRequest<>( + cctx.cacheId(), + nearNodeId, + inTx() ? tx.nearXidVersion() : null, + threadId, + futId, + fut.futureId(), + lockVer, + topVer, + inTx(), + read, + isolation(), + isInvalidate(), + timeout, + 0, + cnt, + inTx() ? tx.size() : cnt, + inTx() ? tx.groupLockKey() : null, + inTx() && tx.partitionLock(), + inTx() ? tx.subjectId() : null, + inTx() ? tx.taskNameHash() : 0, + read ? accessTtl : -1L); + + try { + for (ListIterator<GridDhtCacheEntry<K, V>> it = nearMapping.listIterator(); it.hasNext();) { + GridDhtCacheEntry<K, V> e = it.next(); + + req.addNearKey(e.key(), e.getOrMarshalKeyBytes(), cctx.shared()); + + it.set(addOwned(req, e)); + } + + add(fut); // Append new future. + + // Primary node can never be a reader. + assert !n.id().equals(ctx.localNodeId()); + + if (log.isDebugEnabled()) + log.debug("Sending DHT lock request to near node [node=" + n.id() + + ", req=" + req + ']'); + + cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + } + catch (ClusterTopologyException e) { + fut.onResult(e); + } + catch (IgniteCheckedException e) { + onError(e); + + break; // For + } + } + } + } + finally { + markInitialized(); + } + } + + /** + * @param req Request. + * @param e Entry. + * @return Entry. + * @throws IgniteCheckedException If failed. + */ + private GridDhtCacheEntry<K, V> addOwned(GridDhtLockRequest<K, V> req, GridDhtCacheEntry<K, V> e) + throws IgniteCheckedException { + while (true) { + try { + GridCacheMvccCandidate<K> added = e.candidate(lockVer); + + assert added != null; + assert added.dhtLocal(); + + if (added.ownerVersion() != null) + req.owned(e.key(), e.getOrMarshalKeyBytes(), added.ownerVersion()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry when creating DHT lock request (will retry): " + e); + + e = cctx.dht().entryExx(e.key(), topVer); + } + } + + return e; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return futId.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtLockFuture.class, this, super.toString()); + } + + /** + * Lock request timeout object. + */ + private class LockTimeoutObject extends GridTimeoutObjectAdapter { + /** + * Default constructor. + */ + LockTimeoutObject() { + super(timeout); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"ThrowableInstanceNeverThrown"}) + @Override public void onTimeout() { + if (log.isDebugEnabled()) + log.debug("Timed out waiting for lock response: " + this); + + timedOut = true; + + onComplete(false); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(LockTimeoutObject.class, this); + } + } + + /** + * Mini-future for get operations. Mini-futures are only waiting on a single + * node as opposed to multiple nodes. + */ + private class MiniFuture extends GridFutureAdapter<Boolean> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteUuid futId = IgniteUuid.randomUuid(); + + /** Node. */ + @GridToStringExclude + private ClusterNode node; + + /** DHT mapping. */ + @GridToStringInclude + private List<GridDhtCacheEntry<K, V>> dhtMapping; + + /** Near mapping. */ + @GridToStringInclude + private List<GridDhtCacheEntry<K, V>> nearMapping; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public MiniFuture() { + // No-op. + } + + /** + * @param node Node. + * @param dhtMapping Mapping. + * @param nearMapping nearMapping. + */ + MiniFuture(ClusterNode node, List<GridDhtCacheEntry<K, V>> dhtMapping, List<GridDhtCacheEntry<K, V>> nearMapping) { + super(cctx.kernalContext()); + + assert node != null; + + this.node = node; + this.dhtMapping = dhtMapping; + this.nearMapping = nearMapping; + } + + /** + * @return Future ID. + */ + IgniteUuid futureId() { + return futId; + } + + /** + * @return Node ID. + */ + public ClusterNode node() { + return node; + } + + /** + * @param e Error. + */ + void onResult(Throwable e) { + if (log.isDebugEnabled()) + log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); + + // Fail. + onDone(e); + } + + /** + * @param e Node failure. + */ + void onResult(ClusterTopologyException e) { + if (log.isDebugEnabled()) + log.debug("Remote node left grid while sending or waiting for reply (will ignore): " + this); + + if (tx != null) + tx.removeMapping(node.id()); + + onDone(true); + } + + /** + * @param res Result callback. + */ + void onResult(GridDhtLockResponse<K, V> res) { + if (res.error() != null) + // Fail the whole compound future. + onError(res.error()); + else { + if (nearMapping != null && !F.isEmpty(res.nearEvicted())) { + if (tx != null) { + GridDistributedTxMapping<K, V> m = tx.nearMapping(node.id()); + + if (m != null) + m.evictReaders(res.nearEvicted()); + } + + evictReaders(cctx, res.nearEvicted(), node.id(), res.messageId(), nearMapping); + } + + Set<Integer> invalidParts = res.invalidPartitions(); + + // Removing mappings for invalid partitions. + if (!F.isEmpty(invalidParts)) { + for (Iterator<GridDhtCacheEntry<K, V>> it = dhtMapping.iterator(); it.hasNext();) { + GridDhtCacheEntry<K, V> entry = it.next(); + + if (invalidParts.contains(entry.partition())) { + it.remove(); + + if (log.isDebugEnabled()) + log.debug("Removed mapping for entry [nodeId=" + node.id() + ", entry=" + entry + + ", fut=" + GridDhtLockFuture.this + ']'); + + if (tx != null) + tx.removeDhtMapping(node.id(), entry); + } + } + + if (dhtMapping.isEmpty()) + dhtMap.remove(node); + } + + boolean replicate = cctx.isDrEnabled(); + + boolean rec = cctx.events().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED); + + for (GridCacheEntryInfo<K, V> info : res.preloadEntries()) { + try { + GridCacheEntryEx<K,V> entry = cctx.cache().entryEx(info.key(), topVer); + + if (entry.initialValue(info.value(), info.valueBytes(), info.version(), info.ttl(), + info.expireTime(), true, topVer, replicate ? DR_PRELOAD : DR_NONE)) { + if (rec && !entry.isInternal()) + cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), + (IgniteUuid)null, null, EVT_CACHE_PRELOAD_OBJECT_LOADED, info.value(), true, null, + false, null, null, null); + } + } + catch (IgniteCheckedException e) { + onDone(e); + + return; + } + catch (GridCacheEntryRemovedException e) { + assert false : "Entry cannot become obsolete when DHT local candidate is added " + + "[e=" + e + ", ex=" + e + ']'; + } + } + + // Finish mini future. + onDone(true); + } + } + + /** + * @param cacheCtx Context. + * @param keys Keys to evict readers for. + * @param nodeId Node ID. + * @param msgId Message ID. + * @param entries Entries to check. + */ + @SuppressWarnings({"ForLoopReplaceableByForEach"}) + private void evictReaders(GridCacheContext<K, V> cacheCtx, Collection<IgniteTxKey<K>> keys, UUID nodeId, long msgId, + @Nullable List<GridDhtCacheEntry<K, V>> entries) { + if (entries == null || keys == null || entries.isEmpty() || keys.isEmpty()) + return; + + for (ListIterator<GridDhtCacheEntry<K, V>> it = entries.listIterator(); it.hasNext(); ) { + GridDhtCacheEntry<K, V> cached = it.next(); + + if (keys.contains(cached.txKey())) { + while (true) { + try { + cached.removeReader(nodeId, msgId); + + if (tx != null) + tx.removeNearMapping(nodeId, cached); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + GridDhtCacheEntry<K, V> e = cacheCtx.dht().peekExx(cached.key()); + + if (e == null) + break; + + it.set(e); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MiniFuture.class, this, "nodeId", node.id(), "super", super.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java new file mode 100644 index 0000000..df19eb6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -0,0 +1,596 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * DHT lock request. + */ +public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Near keys. */ + @GridToStringInclude + @GridDirectTransient + private List<K> nearKeys; + + /** Near keys to lock. */ + @GridToStringExclude + @GridDirectCollection(byte[].class) + private List<byte[]> nearKeyBytes; + + /** Invalidate reader flags. */ + private BitSet invalidateEntries; + + /** Mini future ID. */ + private IgniteUuid miniId; + + /** Owner mapped version, if any. */ + @GridToStringInclude + @GridDirectTransient + private Map<K, GridCacheVersion> owned; + + /** Owner mapped version bytes. */ + private byte[] ownedBytes; + + /** Topology version. */ + private long topVer; + + /** Subject ID. */ + @GridDirectVersion(1) + private UUID subjId; + + /** Task name hash. */ + @GridDirectVersion(2) + private int taskNameHash; + + /** Indexes of keys needed to be preloaded. */ + @GridDirectVersion(3) + private BitSet preloadKeys; + + /** TTL for read operation. */ + private long accessTtl; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDhtLockRequest() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param nodeId Node ID. + * @param nearXidVer Near transaction ID. + * @param threadId Thread ID. + * @param futId Future ID. + * @param miniId Mini future ID. + * @param lockVer Cache version. + * @param topVer Topology version. + * @param isInTx {@code True} if implicit transaction lock. + * @param isRead Indicates whether implicit lock is for read or write operation. + * @param isolation Transaction isolation. + * @param isInvalidate Invalidation flag. + * @param timeout Lock timeout. + * @param dhtCnt DHT count. + * @param nearCnt Near count. + * @param txSize Expected transaction size. + * @param grpLockKey Group lock key. + * @param partLock {@code True} if partition lock. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param accessTtl TTL for read operation. + */ + public GridDhtLockRequest( + int cacheId, + UUID nodeId, + GridCacheVersion nearXidVer, + long threadId, + IgniteUuid futId, + IgniteUuid miniId, + GridCacheVersion lockVer, + long topVer, + boolean isInTx, + boolean isRead, + IgniteTxIsolation isolation, + boolean isInvalidate, + long timeout, + int dhtCnt, + int nearCnt, + int txSize, + @Nullable IgniteTxKey grpLockKey, + boolean partLock, + @Nullable UUID subjId, + int taskNameHash, + long accessTtl + ) { + super(cacheId, + nodeId, + nearXidVer, + threadId, + futId, + lockVer, + isInTx, + isRead, + isolation, + isInvalidate, + timeout, + dhtCnt == 0 ? nearCnt : dhtCnt, + txSize, + grpLockKey, + partLock); + + this.topVer = topVer; + + nearKeyBytes = nearCnt == 0 ? Collections.<byte[]>emptyList() : new ArrayList<byte[]>(nearCnt); + nearKeys = nearCnt == 0 ? Collections.<K>emptyList() : new ArrayList<K>(nearCnt); + invalidateEntries = new BitSet(dhtCnt == 0 ? nearCnt : dhtCnt); + + assert miniId != null; + + this.miniId = miniId; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.accessTtl = accessTtl; + } + + /** {@inheritDoc} */ + @Override public boolean allowForStartup() { + return true; + } + + /** + * @return Near node ID. + */ + public UUID nearNodeId() { + return nodeId(); + } + + /** + * @return Subject ID. + */ + public UUID subjectId() { + return subjId; + } + + /** + * @return Task name hash. + */ + public int taskNameHash() { + return taskNameHash; + } + + /** + * @return Topology version. + */ + @Override public long topologyVersion() { + return topVer; + } + + /** + * @return Near keys. + */ + public List<byte[]> nearKeyBytes() { + return nearKeyBytes == null ? Collections.<byte[]>emptyList() : nearKeyBytes; + } + + /** + * Adds a Near key. + * + * @param key Key. + * @param keyBytes Key bytes. + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + public void addNearKey(K key, byte[] keyBytes, GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + if (ctx.deploymentEnabled()) + prepareObject(key, ctx); + + nearKeys.add(key); + + if (keyBytes != null) + nearKeyBytes.add(keyBytes); + } + + /** + * @return Near keys. + */ + public List<K> nearKeys() { + return nearKeys == null ? Collections.<K>emptyList() : nearKeys; + } + + /** + * Adds a DHT key. + * + * @param key Key. + * @param keyBytes Key bytes. + * @param writeEntry Write entry. + * @param drVer DR version. + * @param invalidateEntry Flag indicating whether node should attempt to invalidate reader. + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + public void addDhtKey( + K key, + byte[] keyBytes, + IgniteTxEntry<K, V> writeEntry, + @Nullable GridCacheVersion drVer, + boolean invalidateEntry, + GridCacheContext<K, V> ctx + ) throws IgniteCheckedException { + invalidateEntries.set(idx, invalidateEntry); + + addKeyBytes(key, keyBytes, writeEntry, false, null, drVer, ctx); + } + + /** + * Marks last added key for preloading. + */ + public void markLastKeyForPreload() { + assert idx > 0; + + if (preloadKeys == null) + preloadKeys = new BitSet(); + + preloadKeys.set(idx - 1, true); + } + + /** + * @param idx Key index. + * @return {@code True} if need to preload key with given index. + */ + public boolean needPreloadKey(int idx) { + return preloadKeys != null && preloadKeys.get(idx); + } + + /** + * Sets owner and its mapped version. + * + * @param key Key. + * @param keyBytes Key bytes. + * @param ownerMapped Owner mapped version. + */ + public void owned(K key, byte[] keyBytes, GridCacheVersion ownerMapped) { + if (owned == null) + owned = new GridLeanMap<>(3); + + owned.put(key, ownerMapped); + } + + /** + * @param key Key. + * @return Owner and its mapped versions. + */ + @Nullable public GridCacheVersion owned(K key) { + return owned == null ? null : owned.get(key); + } + + /** + * @param idx Entry index to check. + * @return {@code True} if near entry should be invalidated. + */ + public boolean invalidateNearEntry(int idx) { + return invalidateEntries.get(idx); + } + + /** + * @return Mini ID. + */ + public IgniteUuid miniId() { + return miniId; + } + + /** + * @return TTL for read operation. + */ + public long accessTtl() { + return accessTtl; + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + assert F.isEmpty(nearKeys) || !F.isEmpty(nearKeyBytes); + + if (owned != null) + ownedBytes = CU.marshal(ctx, owned); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (nearKeys == null && nearKeyBytes != null) + nearKeys = unmarshalCollection(nearKeyBytes, ctx, ldr); + + if (ownedBytes != null) + owned = ctx.marshaller().unmarshal(ownedBytes, ldr); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDhtLockRequest _clone = new GridDhtLockRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDhtLockRequest _clone = (GridDhtLockRequest)_msg; + + _clone.nearKeys = nearKeys; + _clone.nearKeyBytes = nearKeyBytes; + _clone.invalidateEntries = invalidateEntries; + _clone.miniId = miniId; + _clone.owned = owned; + _clone.ownedBytes = ownedBytes; + _clone.topVer = topVer; + _clone.subjId = subjId; + _clone.taskNameHash = taskNameHash; + _clone.preloadKeys = preloadKeys; + _clone.accessTtl = accessTtl; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 24: + if (!commState.putBitSet(invalidateEntries)) + return false; + + commState.idx++; + + case 25: + if (!commState.putGridUuid(miniId)) + return false; + + commState.idx++; + + case 26: + if (nearKeyBytes != null) { + if (commState.it == null) { + if (!commState.putInt(nearKeyBytes.size())) + return false; + + commState.it = nearKeyBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 27: + if (!commState.putByteArray(ownedBytes)) + return false; + + commState.idx++; + + case 28: + if (!commState.putLong(topVer)) + return false; + + commState.idx++; + + case 29: + if (!commState.putUuid(subjId)) + return false; + + commState.idx++; + + case 30: + if (!commState.putInt(taskNameHash)) + return false; + + commState.idx++; + + case 31: + if (!commState.putBitSet(preloadKeys)) + return false; + + commState.idx++; + + case 32: + if (!commState.putLong(accessTtl)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 24: + BitSet invalidateEntries0 = commState.getBitSet(); + + if (invalidateEntries0 == BIT_SET_NOT_READ) + return false; + + invalidateEntries = invalidateEntries0; + + commState.idx++; + + case 25: + IgniteUuid miniId0 = commState.getGridUuid(); + + if (miniId0 == GRID_UUID_NOT_READ) + return false; + + miniId = miniId0; + + commState.idx++; + + case 26: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (nearKeyBytes == null) + nearKeyBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + nearKeyBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 27: + byte[] ownedBytes0 = commState.getByteArray(); + + if (ownedBytes0 == BYTE_ARR_NOT_READ) + return false; + + ownedBytes = ownedBytes0; + + commState.idx++; + + case 28: + if (buf.remaining() < 8) + return false; + + topVer = commState.getLong(); + + commState.idx++; + + case 29: + UUID subjId0 = commState.getUuid(); + + if (subjId0 == UUID_NOT_READ) + return false; + + subjId = subjId0; + + commState.idx++; + + case 30: + if (buf.remaining() < 4) + return false; + + taskNameHash = commState.getInt(); + + commState.idx++; + + case 31: + BitSet preloadKeys0 = commState.getBitSet(); + + if (preloadKeys0 == BIT_SET_NOT_READ) + return false; + + preloadKeys = preloadKeys0; + + commState.idx++; + + case 32: + if (buf.remaining() < 8) + return false; + + accessTtl = commState.getLong(); + + commState.idx++; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 29; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtLockRequest.class, this, "nearKeyBytesSize", nearKeyBytes.size(), + "super", super.toString()); + } +}