http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 99f5128..08804ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -75,7 +75,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem private final GridCacheOperation op; /** Keys */ - private Collection<Object> keys; + private Collection<?> keys; /** Values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) @@ -86,7 +86,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem /** Conflict put values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private Collection<GridCacheDrInfo<Object>> conflictPutVals; + private Collection<GridCacheDrInfo<?>> conflictPutVals; /** Conflict remove values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) @@ -184,10 +184,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem GridDhtAtomicCache cache, CacheWriteSynchronizationMode syncMode, GridCacheOperation op, - Collection<Object> keys, + Collection<?> keys, @Nullable Collection<?> vals, @Nullable Object[] invokeArgs, - @Nullable Collection<GridCacheDrInfo<Object>> conflictPutVals, + @Nullable Collection<GridCacheDrInfo<?>> conflictPutVals, @Nullable Collection<GridCacheVersion> conflictRmvVals, final boolean retval, final boolean rawRetval, @@ -258,7 +258,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem } /** {@inheritDoc} */ - @Override public Collection<Object> keys() { + @Override public Collection<?> keys() { return keys; } @@ -526,7 +526,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem } else if (conflictPutVals != null) { // Conflict PUT. - GridCacheDrInfo<Object> conflictPutVal = F.first(conflictPutVals); + GridCacheDrInfo<?> conflictPutVal = F.first(conflictPutVals); val = conflictPutVal.value(); conflictVer = conflictPutVal.version(); @@ -609,7 +609,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem if (vals != null) it = vals.iterator(); - Iterator<GridCacheDrInfo<Object>> conflictPutValsIt = null; + Iterator<GridCacheDrInfo<?>> conflictPutValsIt = null; if (conflictPutVals != null) conflictPutValsIt = conflictPutVals.iterator(); @@ -657,7 +657,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem } } else if (conflictPutVals != null) { - GridCacheDrInfo<Object> conflictPutVal = conflictPutValsIt.next(); + GridCacheDrInfo<?> conflictPutVal = conflictPutValsIt.next(); val = conflictPutVal.value(); conflictVer = conflictPutVal.version();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 0d3e996..f3560d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -378,6 +378,26 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr err.addSuppressed(e); } + /** + * Adds keys to collection of failed keys. + * + * @param keys Key to add. + * @param e Error cause. + * @param ctx Context. + */ + public synchronized void addFailedKeys(Collection<Object> keys, Throwable e, GridCacheContext ctx) { + if (failedKeys == null) + failedKeys = new ArrayList<>(keys.size()); + + for (Object key : keys) + failedKeys.add(ctx.toCacheKeyObject(key)); + + if (err == null) + err = new IgniteCheckedException("Failed to update keys on primary node."); + + err.addSuppressed(e); + } + /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 47d4ffe..3c09c56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -69,7 +69,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param ctx Cache context. * @param map Cache map. */ - public GridDhtColocatedCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, V> map) { + public GridDhtColocatedCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { super(ctx, map); } @@ -80,11 +80,18 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte /** {@inheritDoc} */ @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { + map.setEntryFactory(new GridCacheMapEntryFactory() { /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, - V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { - return new GridDhtColocatedCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId); + @Override public GridCacheMapEntry create(GridCacheContext ctx, + long topVer, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + long ttl, + int hdrId) + { + return new GridDhtColocatedCacheEntry(ctx, topVer, key, hash, val, next, ttl, hdrId); } }); } @@ -93,14 +100,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse<K, V>>() { - @Override public void apply(UUID nodeId, GridNearGetResponse<K, V> res) { + ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { + @Override public void apply(UUID nodeId, GridNearGetResponse res) { processGetResponse(nodeId, res); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse<K, V>>() { - @Override public void apply(UUID nodeId, GridNearLockResponse<K, V> res) { + ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { + @Override public void apply(UUID nodeId, GridNearLockResponse res) { processLockResponse(nodeId, res); } }); @@ -118,9 +125,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @throws GridDhtInvalidPartitionException If {@code allowDetached} is false and node is not primary * for given key. */ - public GridDistributedCacheEntry<K, V> entryExx(K key, long topVer, boolean allowDetached) { + public GridDistributedCacheEntry entryExx(KeyCacheObject key, + long topVer, + boolean allowDetached) + { return allowDetached && !ctx.affinity().primary(ctx.localNode(), key, topVer) ? - new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0) : entryExx(key, topVer); + new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0, 0) : entryExx(key, topVer); } /** {@inheritDoc} */ @@ -144,12 +154,16 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte /** {@inheritDoc} */ @Override public boolean isLocked(K key) { - return ctx.mvcc().isLockedByThread(key, -1); + KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); + + return ctx.mvcc().isLockedByThread(cacheKey, -1); } /** {@inheritDoc} */ @Override public boolean isLockedByThread(K key) { - return ctx.mvcc().isLockedByThread(key, Thread.currentThread().getId()); + KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); + + return ctx.mvcc().isLockedByThread(cacheKey, Thread.currentThread().getId()); } /** {@inheritDoc} */ @@ -169,12 +183,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (F.isEmpty(keys)) return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); - IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(); + IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(); if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { - @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter<K, V> tx) { - return ctx.wrapCloneMap(tx.getAllAsync(ctx, keys, entry, deserializePortable, skipVals)); + @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) { + return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, entry, deserializePortable, skipVals)); } }); } @@ -199,7 +213,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** {@inheritDoc} */ - @Override protected GridCacheEntryEx entryExSafe(K key, long topVer) { + @Override protected GridCacheEntryEx entryExSafe(KeyCacheObject key, long topVer) { try { return ctx.affinity().localNode(key, topVer) ? entryEx(key) : null; } @@ -253,15 +267,17 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte GridCacheEntryEx entry = null; + KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); + while (true) { try { - entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key); + entry = ctx.isSwapOrOffheapEnabled() ? entryEx(cacheKey) : peekEx(cacheKey); // If our DHT cache do has value, then we peek it. if (entry != null) { boolean isNew = entry.isNewLocked(); - V v = entry.innerGet(null, + CacheObject v = entry.innerGet(null, /*swap*/true, /*read-through*/false, /*fail-fast*/true, @@ -279,15 +295,17 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte GridCacheVersion obsoleteVer = context().versions().next(); if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) - removeIfObsolete(key); + removeIfObsolete(cacheKey); success = false; } else { + Object val = v.value(ctx); + if (ctx.portableEnabled() && !skipVals) - v = (V)ctx.unwrapPortableIfNeeded(v, !deserializePortable); + val = ctx.unwrapPortableIfNeeded(val, !deserializePortable); - locVals.put(key, (V)CU.skipValue(v, skipVals)); + locVals.put(key, (V)CU.skipValue(val, skipVals)); } } else @@ -359,7 +377,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Override public IgniteInternalFuture<Boolean> lockAllAsync( Collection<? extends K> keys, long timeout, - @Nullable IgniteTxLocalEx<K, V> tx, + @Nullable IgniteTxLocalEx tx, boolean isInvalidate, boolean isRead, boolean retval, @@ -369,7 +387,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte ) { assert tx == null || tx instanceof GridNearTxLocal; - GridNearTxLocal<K, V> txx = (GridNearTxLocal<K, V>)tx; + GridNearTxLocal txx = (GridNearTxLocal)tx; GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx, keys, @@ -409,19 +427,23 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte int keyCnt = -1; - Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null; + Map<ClusterNode, GridNearUnlockRequest> map = null; - Collection<K> locKeys = new ArrayList<>(); + Collection<KeyCacheObject> locKeys = new ArrayList<>(); for (K key : keys) { - GridDistributedCacheEntry<K, V> entry = peekExx(key); + // TODO IGNITE-51. + KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); + + GridDistributedCacheEntry entry = peekExx(cacheKey); - Cache.Entry<K, V> Entry = entry == null ? entry(key) : entry.wrapLazyValue(); + Cache.Entry<K, V> Entry = entry == null ? entry(key) : entry.<K, V>wrapLazyValue(); if (!ctx.isAll(Entry, filter)) break; // While. - GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(Thread.currentThread().getId(), key, null); + GridCacheMvccCandidate lock = + ctx.mvcc().removeExplicitLock(Thread.currentThread().getId(), cacheKey, null); if (lock != null) { final long topVer = lock.topologyVersion(); @@ -448,20 +470,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte "then they need to be unlocked separately): " + keys); if (!primary.isLocal()) { - GridNearUnlockRequest<K, V> req = map.get(primary); + GridNearUnlockRequest req = map.get(primary); if (req == null) { - map.put(primary, req = new GridNearUnlockRequest<>(ctx.cacheId(), keyCnt)); + map.put(primary, req = new GridNearUnlockRequest(ctx.cacheId(), keyCnt)); req.version(ver); } byte[] keyBytes = entry != null ? entry.getOrMarshalKeyBytes() : CU.marshal(ctx.shared(), key); - req.addKey(key, keyBytes, ctx); + req.addKey(cacheKey, keyBytes, ctx); } else - locKeys.add(key); + locKeys.add(cacheKey); if (log.isDebugEnabled()) log.debug("Removed lock (will distribute): " + lock); @@ -478,10 +500,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (!locKeys.isEmpty()) removeLocks(ctx.localNodeId(), ver, locKeys, true); - for (Map.Entry<ClusterNode, GridNearUnlockRequest<K, V>> mapping : map.entrySet()) { + for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) { ClusterNode n = mapping.getKey(); - GridDistributedUnlockRequest<K, V> req = mapping.getValue(); + GridDistributedUnlockRequest req = mapping.getValue(); assert !n.isLocal(); @@ -510,12 +532,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte try { int keyCnt = -1; - Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null; + Map<ClusterNode, GridNearUnlockRequest> map = null; - Collection<K> locKeys = new LinkedList<>(); + Collection<KeyCacheObject> locKeys = new LinkedList<>(); for (K key : keys) { - GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, key, ver); + // TODO IGNITE-51. + KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); + + GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, cacheKey, ver); if (lock != null) { long topVer = lock.topologyVersion(); @@ -532,22 +557,22 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (!primary.isLocal()) { // Send request to remove from remote nodes. - GridNearUnlockRequest<K, V> req = map.get(primary); + GridNearUnlockRequest req = map.get(primary); if (req == null) { - map.put(primary, req = new GridNearUnlockRequest<>(ctx.cacheId(), keyCnt)); + map.put(primary, req = new GridNearUnlockRequest(ctx.cacheId(), keyCnt)); req.version(ver); } - GridCacheEntryEx entry = peekEx(key); + GridCacheEntryEx entry = peekEx(cacheKey); byte[] keyBytes = entry != null ? entry.getOrMarshalKeyBytes() : CU.marshal(ctx.shared(), key); - req.addKey(key, keyBytes, ctx); + req.addKey(cacheKey, keyBytes, ctx); } else - locKeys.add(key); + locKeys.add(cacheKey); } } @@ -560,10 +585,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver); Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver); - for (Map.Entry<ClusterNode, GridNearUnlockRequest<K, V>> mapping : map.entrySet()) { + for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) { ClusterNode n = mapping.getKey(); - GridDistributedUnlockRequest<K, V> req = mapping.getValue(); + GridDistributedUnlockRequest req = mapping.getValue(); if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys())) { req.completedVersions(committed, rolledback); @@ -593,7 +618,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte */ IgniteInternalFuture<Exception> lockAllAsync( final GridCacheContext<K, V> cacheCtx, - @Nullable final GridNearTxLocal<K, V> tx, + @Nullable final GridNearTxLocal tx, final long threadId, final GridCacheVersion ver, final long topVer, @@ -605,7 +630,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte ) { assert keys != null; - IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); + // TODO IGNITE-51. + // IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); + IgniteInternalFuture<Object> keyFut = null; // Prevent embedded future creation if possible. if (keyFut.isDone()) { @@ -666,7 +693,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte */ private IgniteInternalFuture<Exception> lockAllAsync0( GridCacheContext<K, V> cacheCtx, - @Nullable final GridNearTxLocal<K, V> tx, + @Nullable final GridNearTxLocal tx, long threadId, final GridCacheVersion ver, final long topVer, @@ -700,8 +727,11 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (timedout) break; + // TODO IGNITE-51. + KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); + while (true) { - GridDhtCacheEntry<K, V> entry = entryExx(key, topVer); + GridDhtCacheEntry entry = entryExx(cacheKey, topVer); try { fut.addEntry(key == null ? null : entry); @@ -751,7 +781,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (log.isDebugEnabled()) log.debug("Performing colocated lock [tx=" + tx + ", keys=" + keys + ']'); - IgniteInternalFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx, + IgniteInternalFuture<GridCacheReturn<Object>> txFut = tx.lockAllAsync(cacheCtx, keys, tx.implicit(), txRead, @@ -760,8 +790,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte return new GridDhtEmbeddedFuture<>( ctx.kernalContext(), txFut, - new C2<GridCacheReturn<V>, Exception, Exception>() { - @Override public Exception apply(GridCacheReturn<V> ret, + new C2<GridCacheReturn<Object>, Exception, Exception>() { + @Override public Exception apply(GridCacheReturn<Object> ret, Exception e) { if (e != null) e = U.unwrap(e); @@ -778,7 +808,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param nodeId Sender ID. * @param res Response. */ - private void processGetResponse(UUID nodeId, GridNearGetResponse<K, V> res) { + private void processGetResponse(UUID nodeId, GridNearGetResponse res) { GridPartitionedGetFuture<K, V> fut = (GridPartitionedGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future( res.version(), res.futureId()); @@ -796,7 +826,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param nodeId Node ID. * @param res Response. */ - private void processLockResponse(UUID nodeId, GridNearLockResponse<K, V> res) { + private void processLockResponse(UUID nodeId, GridNearLockResponse res) { assert nodeId != null; assert res != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 722b4e1..ee1e34a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -99,14 +99,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity /** Transaction. */ @GridToStringExclude - private GridNearTxLocal<K, V> tx; + private GridNearTxLocal tx; /** Topology snapshot to operate on. */ private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>(); /** Map of current values. */ - private Map<K, GridTuple3<GridCacheVersion, V, byte[]>> valMap; + private Map<KeyCacheObject, GridTuple3<GridCacheVersion, CacheObject, byte[]>> valMap; /** Trackable flag (here may be non-volatile). */ private boolean trackable; @@ -134,7 +134,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity public GridDhtColocatedLockFuture( GridCacheContext<K, V> cctx, Collection<? extends K> keys, - @Nullable GridNearTxLocal<K, V> tx, + @Nullable GridNearTxLocal tx, boolean read, boolean retval, long timeout, @@ -264,13 +264,13 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * implicit transaction accesses locked entry. * @throws IgniteCheckedException If failed to add entry due to external locking. */ - @Nullable private GridCacheMvccCandidate addEntry(GridDistributedCacheEntry<K, V> entry) throws IgniteCheckedException { + @Nullable private GridCacheMvccCandidate addEntry(GridDistributedCacheEntry entry) throws IgniteCheckedException { GridCacheMvccCandidate cand = cctx.mvcc().explicitLock(threadId, entry.key()); if (inTx()) { IgniteTxEntry txEntry = tx.entry(entry.txKey()); - txEntry.cached(entry, txEntry.keyBytes()); + txEntry.cached(entry, null); if (cand != null) { if (!tx.implicit()) @@ -281,7 +281,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } else { // Check transaction entries (corresponding tx entries must be enlisted in transaction). - cand = new GridCacheMvccCandidate<>(entry, + cand = new GridCacheMvccCandidate(entry, cctx.localNodeId(), null, null, @@ -300,7 +300,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } else { if (cand == null) { - cand = new GridCacheMvccCandidate<>(entry, + cand = new GridCacheMvccCandidate(entry, cctx.localNodeId(), null, null, @@ -399,7 +399,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @param nodeId Sender. * @param res Result. */ - void onResult(UUID nodeId, GridNearLockResponse<K, V> res) { + void onResult(UUID nodeId, GridNearLockResponse res) { if (!isDone()) { if (log.isDebugEnabled()) log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + @@ -535,7 +535,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity // Continue mapping on the same topology version as it was before. topSnapshot.compareAndSet(null, snapshot); - map(keys); + // TODO IGNITE-51. + // map(keys); markInitialized(); @@ -568,7 +569,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity topSnapshot.compareAndSet(null, snapshot); - map(keys); + // TODO IGNITE-51. + // map(keys); markInitialized(); } @@ -590,13 +592,85 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } /** + * Gets next near lock mapping and either acquires dht locks locally or sends near lock request to + * remote primary node. + * + * @param mappings Queue of mappings. + * @throws IgniteCheckedException If mapping can not be completed. + */ + private void proceedMapping(final Deque<GridNearLockMapping> mappings) + throws IgniteCheckedException { + GridNearLockMapping map = mappings.poll(); + + // If there are no more mappings to process, complete the future. + if (map == null) + return; + + final GridNearLockRequest req = map.request(); + final Collection<KeyCacheObject> mappedKeys = map.distributedKeys(); + final ClusterNode node = map.node(); + + if (filter != null && filter.length != 0) + req.filter((IgnitePredicate[])filter, cctx); + + if (node.isLocal()) + lockLocally(mappedKeys, req.topologyVersion(), mappings); + else { + final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings); + + req.miniId(fut.futureId()); + + add(fut); // Append new future. + + IgniteInternalFuture<?> txSync = null; + + if (inTx()) + txSync = cctx.tm().awaitFinishAckAsync(node.id(), tx.threadId()); + + if (txSync == null || txSync.isDone()) { + try { + if (log.isDebugEnabled()) + log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); + + cctx.io().send(node, req, cctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException ex) { + assert fut != null; + + fut.onResult(ex); + } + } + else { + txSync.listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + try { + if (log.isDebugEnabled()) + log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); + + cctx.io().send(node, req, cctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException ex) { + assert fut != null; + + fut.onResult(ex); + } + catch (IgniteCheckedException e) { + onError(e); + } + } + }); + } + } + } + + /** * Maps keys to nodes. Note that we can not simply group keys by nodes and send lock request as * such approach does not preserve order of lock acquisition. Instead, keys are split in continuous * groups belonging to one primary node and locks for these groups are acquired sequentially. * * @param keys Keys. */ - private void map(Collection<? extends K> keys) { + private void map(Collection<KeyCacheObject> keys) { try { GridDiscoveryTopologySnapshot snapshot = topSnapshot.get(); @@ -616,13 +690,13 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (mapAsPrimary(keys, topVer)) return; - ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings = new ConcurrentLinkedDeque8<>(); + ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>(); // Assign keys to primary nodes. - GridNearLockMapping<K, V> map = null; + GridNearLockMapping map = null; - for (K key : keys) { - GridNearLockMapping<K, V> updated = map(key, map, topVer); + for (KeyCacheObject key : keys) { + GridNearLockMapping updated = map(key, map, topVer); // If new mapping was created, add to collection. if (updated != map) { @@ -648,27 +722,27 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity boolean hasRmtNodes = false; // Create mini futures. - for (Iterator<GridNearLockMapping<K, V>> iter = mappings.iterator(); iter.hasNext(); ) { - GridNearLockMapping<K, V> mapping = iter.next(); + for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) { + GridNearLockMapping mapping = iter.next(); ClusterNode node = mapping.node(); - Collection<K> mappedKeys = mapping.mappedKeys(); + Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys(); boolean loc = node.equals(cctx.localNode()); assert !mappedKeys.isEmpty(); - GridNearLockRequest<K, V> req = null; + GridNearLockRequest req = null; - Collection<K> distributedKeys = new ArrayList<>(mappedKeys.size()); + Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size()); - for (K key : mappedKeys) { + for (KeyCacheObject key : mappedKeys) { boolean explicit; - IgniteTxKey<K> txKey = cctx.txKey(key); + IgniteTxKey txKey = cctx.txKey(key); while (true) { - GridDistributedCacheEntry<K, V> entry = null; + GridDistributedCacheEntry entry = null; try { entry = cctx.colocated().entryExx(key, topVer, true); @@ -687,8 +761,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity GridCacheMvccCandidate cand = addEntry(entry); // Will either return value from dht cache or null if this is a miss. - GridTuple3<GridCacheVersion, V, byte[]> val = entry.detached() ? null : - ((GridDhtCacheEntry<K, V>)entry).versionedValue(topVer); + GridTuple3<GridCacheVersion, CacheObject, byte[]> val = entry.detached() ? null : + ((GridDhtCacheEntry)entry).versionedValue(topVer); GridCacheVersion dhtVer = null; @@ -700,7 +774,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (cand != null && !cand.reentry()) { if (req == null) { - req = new GridNearLockRequest<>( + req = new GridNearLockRequest( cctx.cacheId(), topVer, cctx.nodeId(), @@ -792,86 +866,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } /** - * Gets next near lock mapping and either acquires dht locks locally or sends near lock request to - * remote primary node. - * - * @param mappings Queue of mappings. - * @throws IgniteCheckedException If mapping can not be completed. - */ - private void proceedMapping(final Deque<GridNearLockMapping<K, V>> mappings) - throws IgniteCheckedException { - GridNearLockMapping<K, V> map = mappings.poll(); - - // If there are no more mappings to process, complete the future. - if (map == null) - return; - - final GridNearLockRequest<K, V> req = map.request(); - final Collection<K> mappedKeys = map.distributedKeys(); - final ClusterNode node = map.node(); - - if (filter != null && filter.length != 0) - req.filter(filter, cctx); - - if (node.isLocal()) - lockLocally(mappedKeys, req.topologyVersion(), mappings); - else { - final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings); - - req.miniId(fut.futureId()); - - add(fut); // Append new future. - - IgniteInternalFuture<?> txSync = null; - - if (inTx()) - txSync = cctx.tm().awaitFinishAckAsync(node.id(), tx.threadId()); - - if (txSync == null || txSync.isDone()) { - try { - if (log.isDebugEnabled()) - log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - - cctx.io().send(node, req, cctx.ioPolicy()); - } - catch (ClusterTopologyCheckedException ex) { - assert fut != null; - - fut.onResult(ex); - } - } - else { - txSync.listenAsync(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> t) { - try { - if (log.isDebugEnabled()) - log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - - cctx.io().send(node, req, cctx.ioPolicy()); - } - catch (ClusterTopologyCheckedException ex) { - assert fut != null; - - fut.onResult(ex); - } - catch (IgniteCheckedException e) { - onError(e); - } - } - }); - } - } - } - - /** * Locks given keys directly through dht cache. * * @param keys Collection of keys. * @param topVer Topology version to lock on. * @param mappings Optional collection of mappings to proceed locking. */ - private void lockLocally(final Collection<K> keys, long topVer, - @Nullable final Deque<GridNearLockMapping<K, V>> mappings) { + private void lockLocally(final Collection<KeyCacheObject> keys, long topVer, + @Nullable final Deque<GridNearLockMapping> mappings) { if (log.isDebugEnabled()) log.debug("Before locally locking keys : " + keys); @@ -880,7 +882,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity threadId, lockVer, topVer, - keys, + // TODO IGNITE-51. + // keys, + null, read, timeout, accessTtl, @@ -913,11 +917,11 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity ", mappedKeys=" + keys + ", fut=" + GridDhtColocatedLockFuture.this + ']'); if (inTx()) { - for (K key : keys) + for (KeyCacheObject key : keys) tx.entry(cctx.txKey(key)).markLocked(); } else { - for (K key : keys) + for (KeyCacheObject key : keys) cctx.mvcc().markExplicitOwner(key, threadId); } @@ -947,14 +951,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @return {@code True} if all keys were mapped locally, {@code false} if full mapping should be performed. * @throws IgniteCheckedException If key cannot be added to mapping. */ - private boolean mapAsPrimary(Collection<? extends K> keys, long topVer) throws IgniteCheckedException { + private boolean mapAsPrimary(Collection<KeyCacheObject> keys, long topVer) throws IgniteCheckedException { // Assign keys to primary nodes. - Collection<K> distributedKeys = new ArrayList<>(keys.size()); + Collection<KeyCacheObject> distributedKeys = new ArrayList<>(keys.size()); - for (K key : keys) { + for (KeyCacheObject key : keys) { if (!cctx.affinity().primary(cctx.localNode(), key, topVer)) { // Remove explicit locks added so far. - for (K k : keys) + for (KeyCacheObject k : keys) cctx.mvcc().removeExplicitLock(threadId, k, lockVer); return false; @@ -973,7 +977,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (!distributedKeys.isEmpty()) { if (tx != null) { - for (K key : distributedKeys) + for (KeyCacheObject key : distributedKeys) tx.addKeyMapping(cctx.txKey(key), cctx.localNode()); } @@ -992,8 +996,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @return {@code True} if transaction accesses key that was explicitly locked before. * @throws IgniteCheckedException If lock is externally held and transaction is explicit. */ - private boolean addLocalKey(K key, long topVer, Collection<K> distributedKeys) throws IgniteCheckedException { - GridDistributedCacheEntry<K, V> entry = cctx.colocated().entryExx(key, topVer, false); + private boolean addLocalKey(KeyCacheObject key, long topVer, Collection<KeyCacheObject> distributedKeys) + throws IgniteCheckedException { + GridDistributedCacheEntry entry = cctx.colocated().entryExx(key, topVer, false); assert !entry.detached(); @@ -1021,7 +1026,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @return Near lock mapping. * @throws IgniteCheckedException If mapping failed. */ - private GridNearLockMapping<K, V> map(K key, @Nullable GridNearLockMapping<K, V> mapping, + private GridNearLockMapping map(KeyCacheObject key, @Nullable GridNearLockMapping mapping, long topVer) throws IgniteCheckedException { assert mapping == null || mapping.node() != null; @@ -1036,7 +1041,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity " key) [key=" + key + ", primaryNodeId=" + primary.id() + ']'); if (mapping == null || !primary.id().equals(mapping.node().id())) - mapping = new GridNearLockMapping<>(primary, key); + mapping = new GridNearLockMapping(primary, key); else mapping.addKey(key); @@ -1097,11 +1102,11 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity /** Keys. */ @GridToStringInclude - private Collection<K> keys; + private Collection<KeyCacheObject> keys; /** Mappings to proceed. */ @GridToStringExclude - private Deque<GridNearLockMapping<K, V>> mappings; + private Deque<GridNearLockMapping> mappings; /** */ private AtomicBoolean rcvRes = new AtomicBoolean(false); @@ -1118,8 +1123,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @param keys Keys. * @param mappings Mappings to proceed. */ - MiniFuture(ClusterNode node, Collection<K> keys, - Deque<GridNearLockMapping<K, V>> mappings) { + MiniFuture(ClusterNode node, + Collection<KeyCacheObject> keys, + Deque<GridNearLockMapping> mappings) { super(cctx.kernalContext()); this.node = node; @@ -1144,7 +1150,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity /** * @return Keys. */ - public Collection<K> keys() { + public Collection<KeyCacheObject> keys() { return keys; } @@ -1188,7 +1194,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity /** * @param res Result callback. */ - void onResult(GridNearLockResponse<K, V> res) { + void onResult(GridNearLockResponse res) { if (rcvRes.compareAndSet(false, true)) { if (res.error() != null) { if (log.isDebugEnabled()) @@ -1206,10 +1212,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity int i = 0; - for (K k : keys) { - GridTuple3<GridCacheVersion, V, byte[]> oldValTup = valMap.get(k); + for (KeyCacheObject k : keys) { + GridTuple3<GridCacheVersion, CacheObject, byte[]> oldValTup = valMap.get(k); - V newVal = res.value(i); + CacheObject newVal = res.value(i); byte[] newBytes = res.valueBytes(i); GridCacheVersion dhtVer = res.dhtVersion(i); @@ -1232,7 +1238,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity txEntry.markLocked(); - GridDhtDetachedCacheEntry<K, V> entry = (GridDhtDetachedCacheEntry<K, V>)txEntry.cached(); + GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached(); try { if (res.dhtVersion(i) == null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index b1c135a..66efe48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -65,7 +65,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec private IgniteLogger log; /** Keys to request. */ - private Collection<? extends K> keys; + private Collection<KeyCacheObject> keys; /** Keys for which local node is no longer primary. */ private Collection<Integer> invalidParts = new GridLeanSet<>(); @@ -91,7 +91,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param keys Keys. * @param preloader Preloader. */ - public GridDhtForceKeysFuture(GridCacheContext<K, V> cctx, long topVer, Collection<? extends K> keys, + public GridDhtForceKeysFuture(GridCacheContext<K, V> cctx, long topVer, Collection<KeyCacheObject> keys, GridDhtPreloader<K, V> preloader) { super(cctx.kernalContext()); @@ -213,14 +213,14 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param exc Exclude nodes. * @return {@code True} if some mapping was added. */ - private boolean map(Iterable<? extends K> keys, Collection<ClusterNode> exc) { - Map<ClusterNode, Set<K>> mappings = new HashMap<>(); + private boolean map(Iterable<KeyCacheObject> keys, Collection<ClusterNode> exc) { + Map<ClusterNode, Set<KeyCacheObject>> mappings = new HashMap<>(); ClusterNode loc = cctx.localNode(); int curTopVer = topCntr.get(); - for (K key : keys) + for (KeyCacheObject key : keys) map(key, mappings, exc); if (isDone()) @@ -234,9 +234,9 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec trackable = true; // Create mini futures. - for (Map.Entry<ClusterNode, Set<K>> mapped : mappings.entrySet()) { + for (Map.Entry<ClusterNode, Set<KeyCacheObject>> mapped : mappings.entrySet()) { ClusterNode n = mapped.getKey(); - Set<K> mappedKeys = mapped.getValue(); + Set<KeyCacheObject> mappedKeys = mapped.getValue(); int cnt = F.size(mappedKeys); @@ -245,7 +245,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec MiniFuture fut = new MiniFuture(n, mappedKeys, curTopVer, exc); - GridDhtForceKeysRequest req = new GridDhtForceKeysRequest<>( + GridDhtForceKeysRequest req = new GridDhtForceKeysRequest( cctx.cacheId(), futId, fut.miniId(), @@ -282,7 +282,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param exc Exclude nodes. * @param mappings Mappings. */ - private void map(KeyCacheObject key, Map<ClusterNode, Set<K>> mappings, Collection<ClusterNode> exc) { + private void map(KeyCacheObject key, Map<ClusterNode, Set<KeyCacheObject>> mappings, Collection<ClusterNode> exc) { ClusterNode loc = cctx.localNode(); int part = cctx.affinity().partition(key); @@ -318,7 +318,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec } // Create partition. - GridDhtLocalPartition<K, V> locPart = top.localPartition(part, topVer, false); + GridDhtLocalPartition locPart = top.localPartition(part, topVer, false); if (log.isDebugEnabled()) log.debug("Mapping local partition [loc=" + cctx.localNodeId() + ", topVer" + topVer + @@ -346,7 +346,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec return; } - Collection<K> mappedKeys = F.addIfAbsent(mappings, pick, F.<K>newSet()); + Collection<KeyCacheObject> mappedKeys = F.addIfAbsent(mappings, pick, F.<KeyCacheObject>newSet()); assert mappedKeys != null; @@ -380,7 +380,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec private ClusterNode node; /** Requested keys. */ - private Collection<K> keys; + private Collection<KeyCacheObject> keys; /** Topology version for this mini-future. */ private int curTopVer; @@ -404,7 +404,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param curTopVer Topology version for this mini-future. * @param exc Exclude node list. */ - MiniFuture(ClusterNode node, Collection<K> keys, int curTopVer, Collection<ClusterNode> exc) { + MiniFuture(ClusterNode node, Collection<KeyCacheObject> keys, int curTopVer, Collection<ClusterNode> exc) { super(cctx.kernalContext()); assert node != null; @@ -466,7 +466,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param res Result callback. */ void onResult(GridDhtForceKeysResponse res) { - Collection missedKeys = res.missedKeys(); + Collection<KeyCacheObject> missedKeys = res.missedKeys(); boolean remapMissed = false; @@ -479,10 +479,10 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec // If preloading is disabled, we need to check other backups. if (!cctx.preloadEnabled()) { - Collection<K> retryKeys = F.view( + Collection<KeyCacheObject> retryKeys = F.view( keys, F0.notIn(missedKeys), - F0.notIn(F.viewReadOnly(res.forcedInfos(), CU.<K, V>info2Key()))); + F0.notIn(F.viewReadOnly(res.forcedInfos(), CU.<KeyCacheObject, V>info2Key()))); if (!retryKeys.isEmpty()) map(retryKeys, F.concat(false, node, exc)); @@ -495,7 +495,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec for (GridCacheEntryInfo info : res.forcedInfos()) { int p = cctx.affinity().partition(info.key()); - GridDhtLocalPartition<K, V> locPart = top.localPartition(p, -1, false); + GridDhtLocalPartition locPart = top.localPartition(p, -1, false); if (locPart != null && locPart.state() == MOVING && locPart.reserve()) { GridCacheEntryEx entry = cctx.dht().entryEx(info.key()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 9448fa9..e7b8f9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -54,7 +54,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*; @SuppressWarnings("NonConstantFieldWithUpperCaseName") public class GridDhtPartitionDemandPool<K, V> { /** Dummy message to wake up a blocking queue if a node leaves. */ - private final SupplyMessage<K, V> DUMMY_TOP = new SupplyMessage<>(); + private final SupplyMessage DUMMY_TOP = new SupplyMessage(); /** */ private final GridCacheContext<K, V> cctx; @@ -70,7 +70,7 @@ public class GridDhtPartitionDemandPool<K, V> { private final Collection<DemandWorker> dmdWorkers; /** Preload predicate. */ - private IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred; + private IgnitePredicate<GridCacheEntryInfo> preloadPred; /** Future for preload mode {@link org.apache.ignite.cache.CachePreloadMode#SYNC}. */ @GridToStringInclude @@ -172,7 +172,7 @@ public class GridDhtPartitionDemandPool<K, V> { * * @param preloadPred Preload predicate. */ - void preloadPredicate(IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred) { + void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) { this.preloadPred = preloadPred; } @@ -261,7 +261,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @param msg Message to check. * @return {@code True} if dummy message. */ - private boolean dummyTopology(SupplyMessage<K, V> msg) { + private boolean dummyTopology(SupplyMessage msg) { return msg == DUMMY_TOP; } @@ -401,7 +401,7 @@ public class GridDhtPartitionDemandPool<K, V> { private final LinkedBlockingDeque<GridDhtPreloaderAssignments<K, V>> assignQ = new LinkedBlockingDeque<>(); /** Message queue. */ - private final LinkedBlockingDeque<SupplyMessage<K, V>> msgQ = + private final LinkedBlockingDeque<SupplyMessage> msgQ = new LinkedBlockingDeque<>(); /** Counter. */ @@ -443,7 +443,7 @@ public class GridDhtPartitionDemandPool<K, V> { /** * @param msg Message. */ - private void addMessage(SupplyMessage<K, V> msg) { + private void addMessage(SupplyMessage msg) { if (!enterBusy()) return; @@ -481,7 +481,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @return {@code False} if partition has become invalid during preloading. * @throws IgniteInterruptedCheckedException If interrupted. */ - private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo<K, V> entry, long topVer) + private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo entry, long topVer) throws IgniteCheckedException { try { GridCacheEntryEx cached = null; @@ -506,7 +506,7 @@ public class GridDhtPartitionDemandPool<K, V> { if (preloadPred == null || preloadPred.apply(entry)) { if (cached.initialValue( entry.value(), - entry.valueBytes(), + null, entry.version(), entry.ttl(), entry.expireTime(), @@ -569,7 +569,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @throws ClusterTopologyCheckedException If node left. * @throws IgniteCheckedException If failed to send message. */ - private Set<Integer> demandFromNode(ClusterNode node, final long topVer, GridDhtPartitionDemandMessage<K, V> d, + private Set<Integer> demandFromNode(ClusterNode node, final long topVer, GridDhtPartitionDemandMessage d, GridDhtPartitionsExchangeFuture<K, V> exchFut) throws InterruptedException, IgniteCheckedException { GridDhtPartitionTopology<K, V> top = cctx.dht().topology(); @@ -589,9 +589,9 @@ public class GridDhtPartitionDemandPool<K, V> { if (isCancelled() || topologyChanged()) return missed; - cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage<K, V>>() { - @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage<K, V> msg) { - addMessage(new SupplyMessage<>(nodeId, msg)); + cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() { + @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) { + addMessage(new SupplyMessage(nodeId, msg)); } }); @@ -604,7 +604,7 @@ public class GridDhtPartitionDemandPool<K, V> { retry = false; // Create copy. - d = new GridDhtPartitionDemandMessage<>(d, remaining); + d = new GridDhtPartitionDemandMessage(d, remaining); long timeout = GridDhtPartitionDemandPool.this.timeout.get(); @@ -619,7 +619,7 @@ public class GridDhtPartitionDemandPool<K, V> { // While. // ===== while (!isCancelled() && !topologyChanged()) { - SupplyMessage<K, V> s = poll(msgQ, timeout, this); + SupplyMessage s = poll(msgQ, timeout, this); // If timed out. if (s == null) { @@ -634,17 +634,17 @@ public class GridDhtPartitionDemandPool<K, V> { cctx.io().removeOrderedHandler(d.topic()); // Must create copy to be able to work with IO manager thread local caches. - d = new GridDhtPartitionDemandMessage<>(d, remaining); + d = new GridDhtPartitionDemandMessage(d, remaining); // Create new topic. d.topic(topic(++cntr)); // Create new ordered listener. cctx.io().addOrderedHandler(d.topic(), - new CI2<UUID, GridDhtPartitionSupplyMessage<K, V>>() { + new CI2<UUID, GridDhtPartitionSupplyMessage>() { @Override public void apply(UUID nodeId, - GridDhtPartitionSupplyMessage<K, V> msg) { - addMessage(new SupplyMessage<>(nodeId, msg)); + GridDhtPartitionSupplyMessage msg) { + addMessage(new SupplyMessage(nodeId, msg)); } }); @@ -676,7 +676,7 @@ public class GridDhtPartitionDemandPool<K, V> { if (log.isDebugEnabled()) log.debug("Received supply message: " + s); - GridDhtPartitionSupplyMessage<K, V> supply = s.supply(); + GridDhtPartitionSupplyMessage supply = s.supply(); // Check whether there were class loading errors on unmarshal if (supply.classError() != null) { @@ -690,11 +690,11 @@ public class GridDhtPartitionDemandPool<K, V> { } // Preload. - for (Map.Entry<Integer, Collection<GridCacheEntryInfo<K, V>>> e : supply.infos().entrySet()) { + for (Map.Entry<Integer, Collection<GridCacheEntryInfo>> e : supply.infos().entrySet()) { int p = e.getKey(); if (cctx.affinity().localNode(p, topVer)) { - GridDhtLocalPartition<K, V> part = top.localPartition(p, topVer, true); + GridDhtLocalPartition part = top.localPartition(p, topVer, true); assert part != null; @@ -710,7 +710,7 @@ public class GridDhtPartitionDemandPool<K, V> { Collection<Integer> invalidParts = new GridLeanSet<>(); // Loop through all received entries and try to preload them. - for (GridCacheEntryInfo<K, V> entry : e.getValue()) { + for (GridCacheEntryInfo entry : e.getValue()) { if (!invalidParts.contains(p)) { if (!part.preloadingPermitted(entry.key(), entry.version())) { if (log.isDebugEnabled()) @@ -799,7 +799,7 @@ public class GridDhtPartitionDemandPool<K, V> { */ private void drainQueue() throws InterruptedException { while (!msgQ.isEmpty()) { - SupplyMessage<K, V> msg = msgQ.take(); + SupplyMessage msg = msgQ.take(); if (log.isDebugEnabled()) log.debug("Drained supply message: " + msg); @@ -885,7 +885,7 @@ public class GridDhtPartitionDemandPool<K, V> { if (topologyChanged() || isCancelled()) break; // For. - GridDhtPartitionDemandMessage<K, V> d = assigns.remove(node); + GridDhtPartitionDemandMessage d = assigns.remove(node); // If another thread is already processing this message, // move to the next node. @@ -997,7 +997,7 @@ public class GridDhtPartitionDemandPool<K, V> { // If partition belongs to local node. if (cctx.affinity().localNode(p, topVer)) { - GridDhtLocalPartition<K, V> part = top.localPartition(p, topVer, true); + GridDhtLocalPartition part = top.localPartition(p, topVer, true); assert part != null; assert part.id() == p; @@ -1020,10 +1020,10 @@ public class GridDhtPartitionDemandPool<K, V> { else { ClusterNode n = F.first(picked); - GridDhtPartitionDemandMessage<K, V> msg = assigns.get(n); + GridDhtPartitionDemandMessage msg = assigns.get(n); if (msg == null) { - assigns.put(n, msg = new GridDhtPartitionDemandMessage<>( + assigns.put(n, msg = new GridDhtPartitionDemandMessage( top.updateSequence(), exchFut.exchangeId().topologyVersion(), cctx.cacheId())); @@ -1088,12 +1088,12 @@ public class GridDhtPartitionDemandPool<K, V> { /** * Supply message wrapper. */ - private static class SupplyMessage<K, V> { + private static class SupplyMessage { /** Sender ID. */ private UUID sndId; /** Supply message. */ - private GridDhtPartitionSupplyMessage<K, V> supply; + private GridDhtPartitionSupplyMessage supply; /** * Dummy constructor. @@ -1106,7 +1106,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @param sndId Sender ID. * @param supply Supply message. */ - SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage<K, V> supply) { + SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) { this.sndId = sndId; this.supply = supply; } @@ -1121,7 +1121,7 @@ public class GridDhtPartitionDemandPool<K, V> { /** * @return Message. */ - GridDhtPartitionSupplyMessage<K, V> supply() { + GridDhtPartitionSupplyMessage supply() { return supply; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index c141594..6a3e48d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@ -60,13 +60,13 @@ class GridDhtPartitionSupplyPool<K, V> { private final Collection<SupplyWorker> workers = new LinkedList<>(); /** */ - private final BlockingQueue<DemandMessage<K, V>> queue = new LinkedBlockingDeque<>(); + private final BlockingQueue<DemandMessage> queue = new LinkedBlockingDeque<>(); /** */ private final boolean depEnabled; /** Preload predicate. */ - private IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred; + private IgnitePredicate<GridCacheEntryInfo> preloadPred; /** * @param cctx Cache context. @@ -88,8 +88,8 @@ class GridDhtPartitionSupplyPool<K, V> { for (int i = 0; i < poolSize; i++) workers.add(new SupplyWorker()); - cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage<K, V>>() { - @Override public void apply(UUID id, GridDhtPartitionDemandMessage<K, V> m) { + cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() { + @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { processDemandMessage(id, m); } }); @@ -120,7 +120,7 @@ class GridDhtPartitionSupplyPool<K, V> { * * @param preloadPred Preload predicate. */ - void preloadPredicate(IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred) { + void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) { this.preloadPred = preloadPred; } @@ -148,7 +148,7 @@ class GridDhtPartitionSupplyPool<K, V> { * @param nodeId Sender node ID. * @param d Message. */ - private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage<K, V> d) { + private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage d) { if (!enterBusy()) return; @@ -157,7 +157,7 @@ class GridDhtPartitionSupplyPool<K, V> { if (log.isDebugEnabled()) log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']'); - queue.offer(new DemandMessage<>(nodeId, d)); + queue.offer(new DemandMessage(nodeId, d)); } else U.warn(log, "Received partition demand message when preloading is disabled (will ignore): " + d); @@ -212,7 +212,7 @@ class GridDhtPartitionSupplyPool<K, V> { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { while (!isCancelled()) { - DemandMessage<K, V> msg = poll(queue, this); + DemandMessage msg = poll(queue, this); if (msg == null) continue; @@ -234,13 +234,13 @@ class GridDhtPartitionSupplyPool<K, V> { * @param msg Message. * @param node Demander. */ - private void processMessage(DemandMessage<K, V> msg, ClusterNode node) { + private void processMessage(DemandMessage msg, ClusterNode node) { assert msg != null; assert node != null; - GridDhtPartitionDemandMessage<K, V> d = msg.message(); + GridDhtPartitionDemandMessage d = msg.message(); - GridDhtPartitionSupplyMessage<K, V> s = new GridDhtPartitionSupplyMessage<>(d.workerId(), + GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), cctx.cacheId()); long preloadThrottle = cctx.config().getPreloadThrottle(); @@ -256,7 +256,7 @@ class GridDhtPartitionSupplyPool<K, V> { cctx.mvcc().finishLocks(d.topologyVersion()).get(); for (Integer part : d.partitions()) { - GridDhtLocalPartition<K, V> loc = top.localPartition(part, d.topologyVersion(), false); + GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); if (loc == null || loc.state() != OWNING || !loc.reserve()) { // Reply with partition of "-1" to let sender know that @@ -270,11 +270,11 @@ class GridDhtPartitionSupplyPool<K, V> { continue; } - GridCacheEntryInfoCollectSwapListener<K, V> swapLsnr = null; + GridCacheEntryInfoCollectSwapListener swapLsnr = null; try { if (cctx.isSwapOrOffheapEnabled()) { - swapLsnr = new GridCacheEntryInfoCollectSwapListener<>(log, cctx); + swapLsnr = new GridCacheEntryInfoCollectSwapListener(log, cctx); cctx.swap().addOffHeapListener(part, swapLsnr); cctx.swap().addSwapListener(part, swapLsnr); @@ -306,11 +306,11 @@ class GridDhtPartitionSupplyPool<K, V> { if (preloadThrottle > 0) U.sleep(preloadThrottle); - s = new GridDhtPartitionSupplyMessage<>(d.workerId(), d.updateSequence(), + s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), cctx.cacheId()); } - GridCacheEntryInfo<K, V> info = e.info(); + GridCacheEntryInfo info = e.info(); if (info != null && !(info.key() instanceof GridPartitionLockKey) && !info.isNew()) { if (preloadPred == null || preloadPred.apply(info)) @@ -325,7 +325,7 @@ class GridDhtPartitionSupplyPool<K, V> { continue; if (cctx.isSwapOrOffheapEnabled()) { - GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry<V>>> iter = + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = cctx.swap().iterator(part, false); // Iterator may be null if space does not exist. @@ -333,7 +333,7 @@ class GridDhtPartitionSupplyPool<K, V> { try { boolean prepared = false; - for (Map.Entry<byte[], GridCacheSwapEntry<V>> e : iter) { + for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) { if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { // Demander no longer needs this partition, // so we send '-1' partition and move on. @@ -358,27 +358,27 @@ class GridDhtPartitionSupplyPool<K, V> { if (preloadThrottle > 0) U.sleep(preloadThrottle); - s = new GridDhtPartitionSupplyMessage<>(d.workerId(), + s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), cctx.cacheId()); } - GridCacheSwapEntry<V> swapEntry = e.getValue(); + GridCacheSwapEntry swapEntry = e.getValue(); - GridCacheEntryInfo<K, V> info = new GridCacheEntryInfo<>(); + GridCacheEntryInfo info = new GridCacheEntryInfo(); - info.keyBytes(e.getKey()); info.ttl(swapEntry.ttl()); info.expireTime(swapEntry.expireTime()); info.version(swapEntry.version()); - - if (!swapEntry.valueIsByteArray()) { - if (convertPortable) - info.valueBytes(cctx.convertPortableBytes(swapEntry.valueBytes())); - else - info.valueBytes(swapEntry.valueBytes()); - } - else - info.value(swapEntry.value()); +// TODO IGNITE-51. +// if (!swapEntry.valueIsByteArray()) { +// if (convertPortable) +// info.valueBytes(cctx.convertPortableBytes(swapEntry.valueBytes())); +// else +// info.valueBytes(swapEntry.valueBytes()); +// } +// else +// info.value(swapEntry.value()); + info.value(swapEntry.value()); if (preloadPred == null || preloadPred.apply(info)) s.addEntry0(part, info, cctx.shared()); @@ -425,11 +425,11 @@ class GridDhtPartitionSupplyPool<K, V> { } if (swapLsnr != null) { - Collection<GridCacheEntryInfo<K, V>> entries = swapLsnr.entries(); + Collection<GridCacheEntryInfo> entries = swapLsnr.entries(); swapLsnr = null; - for (GridCacheEntryInfo<K, V> info : entries) { + for (GridCacheEntryInfo info : entries) { if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { // Demander no longer needs this partition, // so we send '-1' partition and move on. @@ -449,7 +449,8 @@ class GridDhtPartitionSupplyPool<K, V> { if (!reply(node, d, s)) return; - s = new GridDhtPartitionSupplyMessage<>(d.workerId(), d.updateSequence(), + s = new GridDhtPartitionSupplyMessage(d.workerId(), + d.updateSequence(), cctx.cacheId()); } @@ -494,7 +495,7 @@ class GridDhtPartitionSupplyPool<K, V> { * @return {@code True} if message was sent, {@code false} if recipient left grid. * @throws IgniteCheckedException If failed. */ - private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage<K, V> d, GridDhtPartitionSupplyMessage<K, V> s) + private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) throws IgniteCheckedException { try { if (log.isDebugEnabled()) @@ -516,7 +517,7 @@ class GridDhtPartitionSupplyPool<K, V> { /** * Demand message wrapper. */ - private static class DemandMessage<K, V> extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage<K, V>> { + private static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> { /** */ private static final long serialVersionUID = 0L; @@ -524,7 +525,7 @@ class GridDhtPartitionSupplyPool<K, V> { * @param sndId Sender ID. * @param msg Message. */ - DemandMessage(UUID sndId, GridDhtPartitionDemandMessage<K, V> msg) { + DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) { super(sndId, msg); } @@ -545,7 +546,7 @@ class GridDhtPartitionSupplyPool<K, V> { /** * @return Message. */ - public GridDhtPartitionDemandMessage<K, V> message() { + public GridDhtPartitionDemandMessage message() { return get2(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index db9bd08..1918f61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -125,10 +125,10 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon * Messages received on non-coordinator are stored in case if this node * becomes coordinator. */ - private final Map<UUID, GridDhtPartitionsSingleMessage<K, V>> singleMsgs = new ConcurrentHashMap8<>(); + private final Map<UUID, GridDhtPartitionsSingleMessage> singleMsgs = new ConcurrentHashMap8<>(); /** Messages received from new coordinator. */ - private final Map<UUID, GridDhtPartitionsFullMessage<K, V>> fullMsgs = new ConcurrentHashMap8<>(); + private final Map<UUID, GridDhtPartitionsFullMessage> fullMsgs = new ConcurrentHashMap8<>(); /** */ @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"}) @@ -436,11 +436,11 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes))); - for (Map.Entry<UUID, GridDhtPartitionsSingleMessage<K, V>> m : singleMsgs.entrySet()) + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet()) // If received any messages, process them. onReceive(m.getKey(), m.getValue()); - for (Map.Entry<UUID, GridDhtPartitionsFullMessage<K, V>> m : fullMsgs.entrySet()) + for (Map.Entry<UUID, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet()) // If received any messages, process them. onReceive(m.getKey(), m.getValue()); @@ -551,7 +551,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon * @throws IgniteCheckedException If failed. */ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException { - GridDhtPartitionsSingleMessage<K, V> m = new GridDhtPartitionsSingleMessage<>(id, cctx.versions().last()); + GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last()); for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) @@ -571,7 +571,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon */ private void sendAllPartitions(Collection<? extends ClusterNode> nodes, GridDhtPartitionExchangeId id) throws IgniteCheckedException { - GridDhtPartitionsFullMessage<K, V> m = new GridDhtPartitionsFullMessage<>(id, lastVer.get(), + GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(id, + lastVer.get(), id.topologyVersion()); for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { @@ -695,7 +696,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon * @param nodeId Sender node id. * @param msg Single partition info. */ - public void onReceive(final UUID nodeId, final GridDhtPartitionsSingleMessage<K, V> msg) { + public void onReceive(final UUID nodeId, final GridDhtPartitionsSingleMessage msg) { assert msg != null; assert msg.exchangeId().equals(exchId); @@ -788,7 +789,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon * @param nodeId Sender node ID. * @param msg Full partition info. */ - public void onReceive(final UUID nodeId, final GridDhtPartitionsFullMessage<K, V> msg) { + public void onReceive(final UUID nodeId, final GridDhtPartitionsFullMessage msg) { assert msg != null; if (isDone()) { @@ -847,7 +848,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon * * @param msg Partitions full messages. */ - private void updatePartitionFullMap(GridDhtPartitionsFullMessage<K, V> msg) { + private void updatePartitionFullMap(GridDhtPartitionsFullMessage msg) { for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { Integer cacheId = entry.getKey(); @@ -865,7 +866,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon * * @param msg Partitions single message. */ - private void updatePartitionSingleMap(GridDhtPartitionsSingleMessage<K, V> msg) { + private void updatePartitionSingleMap(GridDhtPartitionsSingleMessage msg) { for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { Integer cacheId = entry.getKey(); GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); @@ -952,10 +953,10 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon if (set) { // If received any messages, process them. - for (Map.Entry<UUID, GridDhtPartitionsSingleMessage<K, V>> m : singleMsgs.entrySet()) + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet()) onReceive(m.getKey(), m.getValue()); - for (Map.Entry<UUID, GridDhtPartitionsFullMessage<K, V>> m : fullMsgs.entrySet()) + for (Map.Entry<UUID, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet()) onReceive(m.getKey(), m.getValue()); // Reassign oldest node and resend. @@ -1000,7 +1001,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon if (!remaining.isEmpty()) { try { cctx.io().safeSend(cctx.discovery().nodes(remaining), - new GridDhtPartitionsSingleRequest<K, V>(exchId), SYSTEM_POOL, null); + new GridDhtPartitionsSingleRequest(exchId), SYSTEM_POOL, null); } catch (IgniteCheckedException e) { U.error(log, "Failed to request partitions from nodes [exchangeId=" + exchId +