http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 80a3470..eb9c9b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -129,29 +129,29 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { log.debug("Starting DHT preloader..."); cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysRequest.class, - new MessageHandler<GridDhtForceKeysRequest<K, V>>() { - @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest<K, V> msg) { + new MessageHandler<GridDhtForceKeysRequest>() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { processForceKeysRequest(node, msg); } }); cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysResponse.class, - new MessageHandler<GridDhtForceKeysResponse<K, V>>() { - @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse<K, V> msg) { + new MessageHandler<GridDhtForceKeysResponse>() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { processForceKeyResponse(node, msg); } }); cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentRequest.class, - new MessageHandler<GridDhtAffinityAssignmentRequest<K, V>>() { - @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentRequest<K, V> msg) { + new MessageHandler<GridDhtAffinityAssignmentRequest>() { + @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentRequest msg) { processAffinityAssignmentRequest(node, msg); } }); cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentResponse.class, - new MessageHandler<GridDhtAffinityAssignmentResponse<K, V>>() { - @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentResponse<K, V> msg) { + new MessageHandler<GridDhtAffinityAssignmentResponse>() { + @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentResponse msg) { processAffinityAssignmentResponse(node, msg); } }); @@ -189,7 +189,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred) { + @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) { super.preloadPredicate(preloadPred); assert supplyPool != null && demandPool != null : "preloadPredicate may be called only after start()"; @@ -316,7 +316,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * @param node Node originated request. * @param msg Force keys message. */ - private void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest<K, V> msg) { + private void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) { IgniteInternalFuture<?> fut = cctx.mvcc().finishKeys(msg.keys(), msg.topologyVersion()); if (fut.isDone()) @@ -333,22 +333,22 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * @param node Node originated request. * @param msg Force keys message. */ - private void processForceKeysRequest0(ClusterNode node, GridDhtForceKeysRequest<K, V> msg) { + private void processForceKeysRequest0(ClusterNode node, GridDhtForceKeysRequest msg) { if (!enterBusy()) return; try { ClusterNode loc = cctx.localNode(); - GridDhtForceKeysResponse<K, V> res = new GridDhtForceKeysResponse<>( + GridDhtForceKeysResponse res = new GridDhtForceKeysResponse( cctx.cacheId(), msg.futureId(), msg.miniId()); - for (K k : msg.keys()) { + for (KeyCacheObject k : msg.keys()) { int p = cctx.affinity().partition(k); - GridDhtLocalPartition<K, V> locPart = top.localPartition(p, -1, false); + GridDhtLocalPartition locPart = top.localPartition(p, -1, false); // If this node is no longer an owner. if (locPart == null && !top.owners(p).contains(loc)) @@ -368,7 +368,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { // after the message was received. In that case, we are // confident that primary node knows of any changes to the key. if (entry != null) { - GridCacheEntryInfo<K, V> info = entry.info(); + GridCacheEntryInfo info = entry.info(); if (info != null && !info.isNew()) res.addInfo(info); @@ -399,7 +399,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * @param node Node. * @param msg Message. */ - private void processForceKeyResponse(ClusterNode node, GridDhtForceKeysResponse<K, V> msg) { + private void processForceKeyResponse(ClusterNode node, GridDhtForceKeysResponse msg) { if (!enterBusy()) return; @@ -422,7 +422,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * @param req Request. */ private void processAffinityAssignmentRequest(final ClusterNode node, - final GridDhtAffinityAssignmentRequest<K, V> req) { + final GridDhtAffinityAssignmentRequest req) { final long topVer = req.topologyVersion(); if (log.isDebugEnabled()) @@ -438,7 +438,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { try { cctx.io().send(node, - new GridDhtAffinityAssignmentResponse<K, V>(cctx.cacheId(), topVer, assignment), AFFINITY_POOL); + new GridDhtAffinityAssignmentResponse(cctx.cacheId(), topVer, assignment), AFFINITY_POOL); } catch (IgniteCheckedException e) { U.error(log, "Failed to send affinity assignment response to remote node [node=" + node + ']', e); @@ -451,7 +451,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * @param node Node. * @param res Response. */ - private void processAffinityAssignmentResponse(ClusterNode node, GridDhtAffinityAssignmentResponse<K, V> res) { + private void processAffinityAssignmentResponse(ClusterNode node, GridDhtAffinityAssignmentResponse res) { if (log.isDebugEnabled()) log.debug("Processing affinity assignment response [node=" + node + ", res=" + res + ']'); @@ -465,7 +465,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * @param part Evicted partition. * @param updateSeq Update sequence. */ - public void onPartitionEvicted(GridDhtLocalPartition<K, V> part, boolean updateSeq) { + public void onPartitionEvicted(GridDhtLocalPartition part, boolean updateSeq) { if (!enterBusy()) return; @@ -488,7 +488,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * @return Future for request. */ @SuppressWarnings( {"unchecked", "RedundantCast"}) - @Override public GridDhtFuture<Object> request(Collection<? extends K> keys, long topVer) { + @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, long topVer) { final GridDhtForceKeysFuture<K, V> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index fafec8d..823d186 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@ -27,7 +27,7 @@ import java.util.concurrent.*; * Partition to node assignments. */ public class GridDhtPreloaderAssignments<K, V> extends - ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage<K, V>> { + ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 89cdb98..5bfae1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -55,7 +55,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { private GridDhtCacheAdapter<K, V> dht; /** Remove queue. */ - private GridCircularBuffer<T2<K, GridCacheVersion>> rmvQueue; + private GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue; /** * Empty constructor required for {@link Externalizable}. @@ -77,8 +77,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { - ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse<K, V>>() { - @Override public void apply(UUID nodeId, GridNearGetResponse<K, V> res) { + ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { + @Override public void apply(UUID nodeId, GridNearGetResponse res) { processGetResponse(nodeId, res); } }); @@ -101,15 +101,15 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { * @param res Update response. */ public void processNearAtomicUpdateResponse( - GridNearAtomicUpdateRequest<K, V> req, - GridNearAtomicUpdateResponse<K, V> res + GridNearAtomicUpdateRequest req, + GridNearAtomicUpdateResponse res ) { /* * Choose value to be stored in near cache: first check key is not in failed and not in skipped list, * then check if value was generated on primary node, if not then use value sent in request. */ - Collection<K> failed = res.failedKeys(); + Collection<KeyCacheObject> failed = res.failedKeys(); List<Integer> nearValsIdxs = res.nearValuesIndexes(); List<Integer> skipped = res.skippedIndexes(); @@ -128,7 +128,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { if (F.contains(skipped, i)) continue; - K key = req.keys().get(i); + KeyCacheObject key = req.keys().get(i); if (F.contains(failed, key)) continue; @@ -142,7 +142,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { continue; } - V val = null; + CacheObject val = null; byte[] valBytes = null; if (F.contains(nearValsIdxs, i)) { @@ -197,8 +197,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { */ private void processNearAtomicUpdateResponse( GridCacheVersion ver, - K key, - @Nullable V val, + KeyCacheObject key, + @Nullable CacheObject val, @Nullable byte[] valBytes, long ttl, long expireTime, @@ -217,7 +217,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { GridCacheOperation op = (val != null || valBytes != null) ? UPDATE : DELETE; - GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate( + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, nodeId, nodeId, @@ -232,7 +232,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*metrics*/true, /*primary*/false, /*check version*/true, - CU.<K, V>empty(), + CU.empty(), DR_NONE, ttl, expireTime, @@ -271,21 +271,21 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { */ public void processDhtAtomicUpdateRequest( UUID nodeId, - GridDhtAtomicUpdateRequest<K, V> req, - GridDhtAtomicUpdateResponse<K, V> res + GridDhtAtomicUpdateRequest req, + GridDhtAtomicUpdateResponse res ) { GridCacheVersion ver = req.writeVersion(); assert ver != null; - Collection<K> backupKeys = req.keys(); + Collection<KeyCacheObject> backupKeys = req.keys(); boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null; String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); for (int i = 0; i < req.nearSize(); i++) { - K key = req.nearKey(i); + KeyCacheObject key = req.nearKey(i); try { while (true) { @@ -305,9 +305,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { break; } - V val = req.nearValue(i); + CacheObject val = req.nearValue(i); byte[] valBytes = req.nearValueBytes(i); - EntryProcessor<K, V, ?> entryProcessor = req.nearEntryProcessor(i); + EntryProcessor<Object, Object, Object> entryProcessor = req.nearEntryProcessor(i); GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != null || valBytes != null) ? @@ -317,7 +317,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { long ttl = req.nearTtl(i); long expireTime = req.nearExpireTime(i); - GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate( + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, nodeId, nodeId, @@ -332,7 +332,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*metrics*/true, /*primary*/false, /*check version*/!req.forceTransformBackups(), - CU.<K, V>empty(), + CU.empty(), DR_NONE, ttl, expireTime, @@ -659,7 +659,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /** {@inheritDoc} */ @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, - @Nullable IgniteTxLocalEx<K, V> tx, + @Nullable IgniteTxLocalEx tx, boolean isInvalidate, boolean isRead, boolean retval, @@ -680,7 +680,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { assert entry.isNear(); try { - T2<K, GridCacheVersion> evicted = rmvQueue.add(new T2<>(entry.key(), ver)); + T2<KeyCacheObject, GridCacheVersion> evicted = rmvQueue.add(new T2<>(entry.key(), ver)); if (evicted != null) removeVersionedEntry(evicted.get1(), evicted.get2()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 2642c5e..a32b6a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -67,13 +67,19 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda /** {@inheritDoc} */ @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { + map.setEntryFactory(new GridCacheMapEntryFactory() { /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, - V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + @Override public GridCacheMapEntry create(GridCacheContext ctx, + long topVer, KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + long ttl, + int hdrId) + { // Can't hold any locks here - this method is invoked when // holding write-lock on the whole cache map. - return new GridNearCacheEntry<>(ctx, key, hash, val, next, ttl, hdrId); + return new GridNearCacheEntry(ctx, key, hash, val, next, ttl, hdrId); } }); } @@ -94,12 +100,12 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public GridCacheEntryEx entryEx(K key, boolean touch) { - GridNearCacheEntry<K, V> entry = null; + @Override public GridCacheEntryEx entryEx(KeyCacheObject key, boolean touch) { + GridNearCacheEntry entry = null; while (true) { try { - entry = (GridNearCacheEntry<K, V>)super.entryEx(key, touch); + entry = (GridNearCacheEntry)super.entryEx(key, touch); entry.initializeFromDht(ctx.affinity().affinityTopologyVersion()); @@ -113,12 +119,12 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public GridCacheEntryEx entryEx(K key, long topVer) { - GridNearCacheEntry<K, V> entry = null; + @Override public GridCacheEntryEx entryEx(KeyCacheObject key, long topVer) { + GridNearCacheEntry entry = null; while (true) { try { - entry = (GridNearCacheEntry<K, V>)super.entryEx(key, topVer); + entry = (GridNearCacheEntry)super.entryEx(key, topVer); entry.initializeFromDht(topVer); @@ -136,16 +142,16 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda * @param topVer Topology version. * @return Entry. */ - public GridNearCacheEntry<K, V> entryExx(K key, long topVer) { - return (GridNearCacheEntry<K, V>)entryEx(key, topVer); + public GridNearCacheEntry entryExx(KeyCacheObject key, long topVer) { + return (GridNearCacheEntry)entryEx(key, topVer); } /** * @param key Key. * @return Entry. */ - @Nullable public GridNearCacheEntry<K, V> peekExx(K key) { - return (GridNearCacheEntry<K, V>)peekEx(key); + @Nullable public GridNearCacheEntry peekExx(KeyCacheObject key) { + return (GridNearCacheEntry)peekEx(key); } /** {@inheritDoc} */ @@ -178,23 +184,25 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "RedundantCast"}) @Override public IgniteInternalFuture<Object> readThroughAllAsync( - Collection<? extends K> keys, + Collection<KeyCacheObject> keys, boolean reload, boolean skipVals, IgniteInternalTx tx, @Nullable UUID subjId, String taskName, - IgniteBiInClosure<K, V> vis + IgniteBiInClosure<KeyCacheObject, Object> vis ) { - return (IgniteInternalFuture)loadAsync(tx, - keys, - reload, - false, - subjId, - taskName, - true, - null, - skipVals); + return null; +// TODO IGNITE-51. +// return (IgniteInternalFuture)loadAsync(tx, +// keys, +// reload, +// false, +// subjId, +// taskName, +// true, +// null, +// skipVals); } /** {@inheritDoc} */ @@ -323,7 +331,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda * @param nodeId Sender ID. * @param res Response. */ - protected void processGetResponse(UUID nodeId, GridNearGetResponse<K, V> res) { + protected void processGetResponse(UUID nodeId, GridNearGetResponse res) { GridNearGetFuture<K, V> fut = (GridNearGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future( res.version(), res.futureId()); @@ -379,24 +387,24 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda F.flatCollections( F.viewReadOnly( dht().topology().currentLocalPartitions(), - new C1<GridDhtLocalPartition<K, V>, Collection<Cache.Entry<K, V>>>() { - @Override public Collection<Cache.Entry<K, V>> apply(GridDhtLocalPartition<K, V> p) { + new C1<GridDhtLocalPartition, Collection<Cache.Entry<K, V>>>() { + @Override public Collection<Cache.Entry<K, V>> apply(GridDhtLocalPartition p) { return F.viewReadOnly( p.entries(), - new C1<GridDhtCacheEntry<K, V>, Cache.Entry<K, V>>() { - @Override public Cache.Entry<K, V> apply(GridDhtCacheEntry<K, V> e) { + new C1<GridDhtCacheEntry, Cache.Entry<K, V>>() { + @Override public Cache.Entry<K, V> apply(GridDhtCacheEntry e) { return e.wrapLazyValue(); } }, - new P1<GridDhtCacheEntry<K, V>>() { - @Override public boolean apply(GridDhtCacheEntry<K, V> e) { + new P1<GridDhtCacheEntry>() { + @Override public boolean apply(GridDhtCacheEntry e) { return !e.obsoleteOrDeleted(); } }); } }, - new P1<GridDhtLocalPartition<K, V>>() { - @Override public boolean apply(GridDhtLocalPartition<K, V> p) { + new P1<GridDhtLocalPartition>() { + @Override public boolean apply(GridDhtLocalPartition p) { return p.primary(topVer); } })); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 52d5010..2f6bebb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -327,7 +327,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { /** {@inheritDoc} */ @Override protected Object readThrough(IgniteInternalTx tx, KeyCacheObject key, boolean reload, UUID subjId, String taskName) throws IgniteCheckedException { - return null. + return null; // TODO IGNTIE-51. // return cctx.near().loadAsync(tx, // F.asList(key), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 1a08ec6..20e5b87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -81,7 +81,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma private GridCacheVersion ver; /** Transaction. */ - private IgniteTxLocalEx<K, V> tx; + private IgniteTxLocalEx tx; /** Logger. */ private IgniteLogger log; @@ -134,7 +134,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma boolean readThrough, boolean reload, boolean forcePrimary, - @Nullable IgniteTxLocalEx<K, V> tx, + @Nullable IgniteTxLocalEx tx, @Nullable UUID subjId, String taskName, boolean deserializePortable, @@ -235,7 +235,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma * @param nodeId Sender. * @param res Result. */ - void onResult(UUID nodeId, GridNearGetResponse<K, V> res) { + void onResult(UUID nodeId, GridNearGetResponse res) { for (IgniteInternalFuture<Map<K, V>> fut : futures()) if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -324,10 +324,12 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma // If this is the primary or backup node for the keys. if (n.isLocal()) { - final GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut = + final GridDhtFuture<Collection<GridCacheEntryInfo>> fut = dht().getDhtAsync(n.id(), -1, - mappedKeys, + // TODO IGNITE-51. + // mappedKeys, + null, readThrough, reload, topVer, @@ -358,8 +360,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma } // Add new future. - add(fut.chain(new C1<IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>>, Map<K, V>>() { - @Override public Map<K, V> apply(IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>> fut) { + add(fut.chain(new C1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>, Map<K, V>>() { + @Override public Map<K, V> apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut) { try { return loadEntries(n.id(), mappedKeys.keySet(), fut.get(), saved, topVer); } @@ -382,12 +384,14 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma MiniFuture fut = new MiniFuture(n, mappedKeys, saved, topVer); - GridCacheMessage<K, V> req = new GridNearGetRequest<>( + GridCacheMessage req = new GridNearGetRequest( cctx.cacheId(), futId, fut.futureId(), ver, - mappedKeys, + // TODO IGNITE-51. + // mappedKeys, + null, readThrough, reload, topVer, @@ -427,11 +431,14 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma // Allow to get cached value from the local node. boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer); - GridCacheEntryEx entry = allowLocRead ? near.peekEx(key) : null; + // TODO IGNITE-51. + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + + GridCacheEntryEx entry = allowLocRead ? near.peekEx(cacheKey) : null; while (true) { try { - V v = null; + CacheObject v = null; boolean isNear = entry != null; @@ -456,7 +463,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma GridDhtCacheAdapter<K, V> dht = cache().dht(); try { - entry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(key) : dht.peekEx(key); + entry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(cacheKey) : dht.peekEx(cacheKey); // If near cache does not have value, then we peek DHT cache. if (entry != null) { @@ -477,7 +484,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma // Entry was not in memory or in swap, so we remove it from cache. if (v == null && isNew && entry.markObsoleteIfEmpty(ver)) - dht.removeIfObsolete(key); + dht.removeIfObsolete(cacheKey); } if (v != null) { @@ -506,18 +513,19 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (v != null && !reload) { K key0 = key; - if (cctx.portableEnabled()) { - v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable); - key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); - } - - add(new GridFinishedFuture<>(cctx.kernalContext(), Collections.singletonMap(key0, v))); +// TODO IGNITE-51. +// if (cctx.portableEnabled()) { +// v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable); +// key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); +// } +// +// add(new GridFinishedFuture<>(cctx.kernalContext(), Collections.singletonMap(key0, v))); } else { if (primary == null) primary = cctx.affinity().primary(key, topVer); - GridNearCacheEntry<K, V> nearEntry = allowLocRead ? near.peekExx(key) : null; + GridNearCacheEntry nearEntry = allowLocRead ? near.peekExx(cacheKey) : null; entry = nearEntry; @@ -541,7 +549,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma // Don't add reader if transaction acquires lock anyway to avoid deadlock. boolean addRdr = tx == null || tx.optimistic(); - if (!addRdr && tx.readCommitted() && !tx.writeSet().contains(cctx.txKey(key))) + if (!addRdr && tx.readCommitted() && !tx.writeSet().contains(cctx.txKey(cacheKey))) addRdr = true; LinkedHashMap<K, Boolean> old = mappings.get(primary); @@ -560,7 +568,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma break; } catch (GridCacheEntryRemovedException ignored) { - entry = allowLocRead ? near.peekEx(key) : null; + entry = allowLocRead ? near.peekEx(cacheKey) : null; } catch (GridCacheFilterFailedException e) { if (log.isDebugEnabled()) @@ -601,7 +609,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma */ private Map<K, V> loadEntries(UUID nodeId, Collection<K> keys, - Collection<GridCacheEntryInfo<K, V>> infos, + Collection<GridCacheEntryInfo> infos, Map<K, GridCacheVersion> savedVers, long topVer) { boolean empty = F.isEmpty(keys); @@ -613,13 +621,13 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma GridCacheVersion ver = atomic ? null : F.isEmpty(infos) ? null : cctx.versions().next(); - for (GridCacheEntryInfo<K, V> info : infos) { + for (GridCacheEntryInfo info : infos) { try { info.unmarshalValue(cctx, cctx.deploy().globalLoader()); // Entries available locally in DHT should not be loaded into near cache for reading. - if (!cctx.cache().affinity().isPrimaryOrBackup(cctx.localNode(), info.key())) { - GridNearCacheEntry<K, V> entry = cache().entryExx(info.key(), topVer); + if (!cctx.affinity().localNode(info.key(), cctx.affinity().affinityTopologyVersion())) { + GridNearCacheEntry entry = cache().entryExx(info.key(), topVer); GridCacheVersion saved = savedVers.get(info.key()); @@ -627,7 +635,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma entry.loadedValue(tx, nodeId, info.value(), - info.valueBytes(), + null, atomic ? info.version() : ver, info.version(), saved, @@ -640,15 +648,16 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma cctx.evicts().touch(entry, topVer); } - V val = info.value(); - K key = info.key(); - - if (cctx.portableEnabled()) { - val = (V)cctx.unwrapPortableIfNeeded(val, !deserializePortable); - key = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); - } + CacheObject val = info.value(); + KeyCacheObject key = info.key(); - map.put(key, val); +// TODO IGNITE-51. +// if (cctx.portableEnabled()) { +// val = (V)cctx.unwrapPortableIfNeeded(val, !deserializePortable); +// key = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); +// } +// +// map.put(key, val); } catch (GridCacheEntryRemovedException ignore) { if (log.isDebugEnabled()) @@ -782,7 +791,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma /** * @param res Result callback. */ - void onResult(final GridNearGetResponse<K, V> res) { + void onResult(final GridNearGetResponse res) { final Collection<Integer> invalidParts = res.invalidPartitions(); // If error happened on remote node, fail the whole future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 23e4320..ee2a0f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -49,7 +49,7 @@ import static org.apache.ignite.events.EventType.*; * Cache lock future. */ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> - implements GridCacheMvccFuture<K, V, Boolean> { + implements GridCacheMvccFuture<Boolean> { /** */ private static final long serialVersionUID = 0L; @@ -65,7 +65,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B private long threadId; /** Keys to lock. */ - private Collection<? extends K> keys; + private Collection<KeyCacheObject> keys; /** Future ID. */ private IgniteUuid futId; @@ -101,14 +101,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B /** Transaction. */ @GridToStringExclude - private GridNearTxLocal<K, V> tx; + private GridNearTxLocal tx; /** Topology snapshot to operate on. */ private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>(); /** Map of current values. */ - private Map<K, GridTuple3<GridCacheVersion, V, byte[]>> valMap; + private Map<KeyCacheObject, GridTuple3<GridCacheVersion, CacheObject, byte[]>> valMap; /** Trackable flag. */ private boolean trackable = true; @@ -119,7 +119,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B /** Keys locked so far. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) @GridToStringExclude - private List<GridDistributedCacheEntry<K, V>> entries; + private List<GridDistributedCacheEntry> entries; /** TTL for read operation. */ private long accessTtl; @@ -143,8 +143,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B */ public GridNearLockFuture( GridCacheContext<K, V> cctx, - Collection<? extends K> keys, - @Nullable GridNearTxLocal<K, V> tx, + Collection<KeyCacheObject> keys, + @Nullable GridNearTxLocal tx, boolean read, boolean retval, long timeout, @@ -205,7 +205,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B /** * @return Entries. */ - public List<GridDistributedCacheEntry<K, V>> entriesCopy() { + public List<GridDistributedCacheEntry> entriesCopy() { synchronized (mux) { return new ArrayList<>(entries); } @@ -296,7 +296,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @return Lock candidate. * @throws GridCacheEntryRemovedException If entry was removed. */ - @Nullable private GridCacheMvccCandidate addEntry(long topVer, GridNearCacheEntry<K, V> entry, UUID dhtNodeId) + @Nullable private GridCacheMvccCandidate addEntry(long topVer, GridNearCacheEntry entry, UUID dhtNodeId) throws GridCacheEntryRemovedException { // Check if lock acquisition is timed out. if (timedOut) @@ -316,7 +316,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (inTx()) { IgniteTxEntry txEntry = tx.entry(entry.txKey()); - txEntry.cached(entry, txEntry.keyBytes()); + txEntry.cached(entry, null); } if (c != null) @@ -353,7 +353,9 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B private void undoLocks(boolean dist) { // Transactions will undo during rollback. if (dist && tx == null) - cctx.nearTx().removeLocks(lockVer, keys); + cctx.nearTx().removeLocks(lockVer, null); + // TODO IGNTIE-51 + // cctx.nearTx().removeLocks(lockVer, keys); else { if (tx != null) { if (tx.setRollbackOnly()) { @@ -445,7 +447,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @param nodeId Sender. * @param res Result. */ - void onResult(UUID nodeId, GridNearLockResponse<K, V> res) { + void onResult(UUID nodeId, GridNearLockResponse res) { if (!isDone()) { if (log.isDebugEnabled()) log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']'); @@ -552,7 +554,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B log.debug("Got removed entry in onOwnerChanged method (will retry): " + cached); // Replace old entry with new one. - entries.set(i, (GridDistributedCacheEntry<K, V>)cctx.cache().entryEx(cached.key())); + entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(cached.key())); } } } @@ -724,7 +726,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * * @param keys Keys. */ - private void map(Iterable<? extends K> keys) { + private void map(Iterable<KeyCacheObject> keys) { try { GridDiscoveryTopologySnapshot snapshot = topSnapshot.get(); @@ -741,14 +743,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B return; } - ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings = + ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>(); // Assign keys to primary nodes. - GridNearLockMapping<K, V> map = null; + GridNearLockMapping map = null; - for (K key : keys) { - GridNearLockMapping<K, V> updated = map(key, map, topVer); + for (KeyCacheObject key : keys) { + GridNearLockMapping updated = map(key, map, topVer); // If new mapping was created, add to collection. if (updated != map) { @@ -772,30 +774,30 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); // Create mini futures. - for (Iterator<GridNearLockMapping<K, V>> iter = mappings.iterator(); iter.hasNext(); ) { - GridNearLockMapping<K, V> mapping = iter.next(); + for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) { + GridNearLockMapping mapping = iter.next(); ClusterNode node = mapping.node(); - Collection<K> mappedKeys = mapping.mappedKeys(); + Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys(); assert !mappedKeys.isEmpty(); - GridNearLockRequest<K, V> req = null; + GridNearLockRequest req = null; - Collection<K> distributedKeys = new ArrayList<>(mappedKeys.size()); + Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size()); boolean explicit = false; - for (K key : mappedKeys) { - IgniteTxKey<K> txKey = cctx.txKey(key); + for (KeyCacheObject key : mappedKeys) { + IgniteTxKey txKey = cctx.txKey(key); while (true) { - GridNearCacheEntry<K, V> entry = null; + GridNearCacheEntry entry = null; try { entry = cctx.near().entryExx(key, topVer); - if (!cctx.isAll(entry.wrapLazyValue(), filter)) { + if (!cctx.isAll(entry.<K, V>wrapLazyValue(), filter)) { if (log.isDebugEnabled()) log.debug("Entry being locked did not pass filter (will not lock): " + entry); @@ -819,10 +821,10 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (tx == null && !cand.reentry()) cctx.mvcc().addExplicitLock(threadId, cand, snapshot); - GridTuple3<GridCacheVersion, V, byte[]> val = entry.versionedValue(); + GridTuple3<GridCacheVersion, CacheObject, byte[]> val = entry.versionedValue(); if (val == null) { - GridDhtCacheEntry<K, V> dhtEntry = dht().peekExx(key); + GridDhtCacheEntry dhtEntry = dht().peekExx(key); try { if (dhtEntry != null) @@ -848,7 +850,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (!cand.reentry()) { if (req == null) { - req = new GridNearLockRequest<>( + req = new GridNearLockRequest( cctx.cacheId(), topVer, cctx.nodeId(), @@ -940,20 +942,21 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @param mappings Queue of mappings. * @throws IgniteCheckedException If mapping can not be completed. */ - private void proceedMapping(final ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings) + @SuppressWarnings("unchecked") + private void proceedMapping(final ConcurrentLinkedDeque8<GridNearLockMapping> mappings) throws IgniteCheckedException { - GridNearLockMapping<K, V> map = mappings.poll(); + GridNearLockMapping map = mappings.poll(); // If there are no more mappings to process, complete the future. if (map == null) return; - final GridNearLockRequest<K, V> req = map.request(); - final Collection<K> mappedKeys = map.distributedKeys(); + final GridNearLockRequest req = map.request(); + final Collection<KeyCacheObject> mappedKeys = map.distributedKeys(); final ClusterNode node = map.node(); if (filter != null && filter.length != 0) - req.filter(filter, cctx); + req.filter((IgnitePredicate[])filter, cctx); if (node.isLocal()) { req.miniId(IgniteUuid.randomUuid()); @@ -961,14 +964,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (log.isDebugEnabled()) log.debug("Before locally locking near request: " + req); - IgniteInternalFuture<GridNearLockResponse<K, V>> fut = dht().lockAllAsync(cctx, cctx.localNode(), req, filter); + IgniteInternalFuture<GridNearLockResponse> fut = dht().lockAllAsync(cctx, cctx.localNode(), req, filter); // Add new future. - add(new GridEmbeddedFuture<>( + add(new GridEmbeddedFuture( cctx.kernalContext(), fut, - new C2<GridNearLockResponse<K, V>, Exception, Boolean>() { - @Override public Boolean apply(GridNearLockResponse<K, V> res, Exception e) { + new C2<GridNearLockResponse, Exception, Boolean>() { + @Override public Boolean apply(GridNearLockResponse res, Exception e) { if (CU.isLockTimeoutOrCancelled(e) || (res != null && CU.isLockTimeoutOrCancelled(res.error()))) return false; @@ -998,16 +1001,17 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B try { int i = 0; - for (K k : mappedKeys) { + for (KeyCacheObject k : mappedKeys) { while (true) { - GridNearCacheEntry<K, V> entry = cctx.near().entryExx(k, req.topologyVersion()); + GridNearCacheEntry entry = cctx.near().entryExx(k, req.topologyVersion()); try { - GridTuple3<GridCacheVersion, V, byte[]> oldValTup = valMap.get(entry.key()); + GridTuple3<GridCacheVersion, CacheObject, byte[]> oldValTup = + valMap.get(entry.key()); boolean hasBytes = entry.hasValue(); - V oldVal = entry.rawGet(); - V newVal = res.value(i); + CacheObject oldVal = entry.rawGet(); + CacheObject newVal = res.value(i); byte[] newBytes = res.valueBytes(i); GridCacheVersion dhtVer = res.dhtVersion(i); @@ -1038,7 +1042,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (inTx() && implicitTx() && tx.onePhaseCommit()) { boolean pass = res.filterResult(i); - tx.entry(cctx.txKey(k)).filters(pass ? CU.<K, V>empty() : CU.<K, V>alwaysFalse()); + tx.entry(cctx.txKey(k)).filters(pass ? CU.empty() : CU.alwaysFalse()); } if (record) { @@ -1073,7 +1077,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B "removed (will renew)."); // Replace old entry with new one. - entries.set(i, (GridDistributedCacheEntry<K, V>) + entries.set(i, (GridDistributedCacheEntry) cctx.cache().entryEx(entry.key())); } } @@ -1150,7 +1154,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @return Near lock mapping. * @throws IgniteCheckedException If mapping for key failed. */ - private GridNearLockMapping<K, V> map(K key, @Nullable GridNearLockMapping<K, V> mapping, + private GridNearLockMapping map(KeyCacheObject key, @Nullable GridNearLockMapping mapping, long topVer) throws IgniteCheckedException { assert mapping == null || mapping.node() != null; @@ -1165,7 +1169,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B " key) [key=" + key + ", primaryNodeId=" + primary.id() + ']'); if (mapping == null || !primary.id().equals(mapping.node().id())) - mapping = new GridNearLockMapping<>(primary, key); + mapping = new GridNearLockMapping(primary, key); else mapping.addKey(key); @@ -1236,11 +1240,11 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B /** Keys. */ @GridToStringInclude - private Collection<K> keys; + private Collection<KeyCacheObject> keys; /** Mappings to proceed. */ @GridToStringExclude - private ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings; + private ConcurrentLinkedDeque8<GridNearLockMapping> mappings; /** */ private AtomicBoolean rcvRes = new AtomicBoolean(false); @@ -1257,8 +1261,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @param keys Keys. * @param mappings Mappings to proceed. */ - MiniFuture(ClusterNode node, Collection<K> keys, - ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings) { + MiniFuture(ClusterNode node, Collection<KeyCacheObject> keys, + ConcurrentLinkedDeque8<GridNearLockMapping> mappings) { super(cctx.kernalContext()); this.node = node; @@ -1283,7 +1287,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B /** * @return Keys. */ - public Collection<K> keys() { + public Collection<KeyCacheObject> keys() { return keys; } @@ -1327,7 +1331,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B /** * @param res Result callback. */ - void onResult(GridNearLockResponse<K, V> res) { + void onResult(GridNearLockResponse res) { if (rcvRes.compareAndSet(false, true)) { if (res.error() != null) { if (log.isDebugEnabled()) @@ -1347,9 +1351,9 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B long topVer = topSnapshot.get().topologyVersion(); - for (K k : keys) { + for (KeyCacheObject k : keys) { while (true) { - GridNearCacheEntry<K, V> entry = cctx.near().entryExx(k, topVer); + GridNearCacheEntry entry = cctx.near().entryExx(k, topVer); try { if (res.dhtVersion(i) == null) { @@ -1359,11 +1363,11 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B return; } - GridTuple3<GridCacheVersion, V, byte[]> oldValTup = valMap.get(entry.key()); + GridTuple3<GridCacheVersion, CacheObject, byte[]> oldValTup = valMap.get(entry.key()); - V oldVal = entry.rawGet(); + CacheObject oldVal = entry.rawGet(); boolean hasOldVal = false; - V newVal = res.value(i); + CacheObject newVal = res.value(i); byte[] newBytes = res.valueBytes(i); boolean readRecordable = false; @@ -1397,7 +1401,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (inTx() && implicitTx() && tx.onePhaseCommit()) { boolean pass = res.filterResult(i); - tx.entry(cctx.txKey(k)).filters(pass ? CU.<K, V>empty() : CU.<K, V>alwaysFalse()); + tx.entry(cctx.txKey(k)).filters(pass ? CU.empty() : CU.alwaysFalse()); } entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(), @@ -1433,7 +1437,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B log.debug("Failed to add candidates because entry was removed (will renew)."); // Replace old entry with new one. - entries.set(i, (GridDistributedCacheEntry<K, V>)cctx.cache().entryEx(entry.key())); + entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); } catch (IgniteCheckedException e) { onDone(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java index 8d3b440..51000ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -27,21 +28,21 @@ import java.util.*; /** * Key-to node mapping. */ -public class GridNearLockMapping<K, V> { +public class GridNearLockMapping { /** Node to which keys are mapped. */ private ClusterNode node; /** Collection of mapped keys. */ @GridToStringInclude - private Collection<K> mappedKeys = new LinkedList<>(); + private Collection<KeyCacheObject> mappedKeys = new LinkedList<>(); /** Near lock request. */ @GridToStringExclude - private GridNearLockRequest<K, V> req; + private GridNearLockRequest req; /** Distributed keys. Key will not be distributed if lock is reentry. */ @GridToStringInclude - private Collection<K> distributedKeys; + private Collection<KeyCacheObject> distributedKeys; /** * Creates near lock mapping for specified node and key. @@ -49,7 +50,7 @@ public class GridNearLockMapping<K, V> { * @param node Node. * @param firstKey First key in mapped keys collection. */ - public GridNearLockMapping(ClusterNode node, K firstKey) { + public GridNearLockMapping(ClusterNode node, KeyCacheObject firstKey) { assert node != null; assert firstKey != null; @@ -68,28 +69,28 @@ public class GridNearLockMapping<K, V> { /** * @return Mapped keys. */ - public Collection<K> mappedKeys() { + public Collection<KeyCacheObject> mappedKeys() { return mappedKeys; } /** * @param key Key to add to mapping. */ - public void addKey(K key) { + public void addKey(KeyCacheObject key) { mappedKeys.add(key); } /** * @return Near lock request. */ - @Nullable public GridNearLockRequest<K, V> request() { + @Nullable public GridNearLockRequest request() { return req; } /** * @param req Near lock request. */ - public void request(GridNearLockRequest<K, V> req) { + public void request(GridNearLockRequest req) { assert req != null; this.req = req; @@ -98,14 +99,14 @@ public class GridNearLockMapping<K, V> { /** * @return Collection of distributed keys. */ - public Collection<K> distributedKeys() { + public Collection<KeyCacheObject> distributedKeys() { return distributedKeys; } /** * @param distributedKeys Collection of distributed keys. */ - public void distributedKeys(Collection<K> distributedKeys) { + public void distributedKeys(Collection<KeyCacheObject> distributedKeys) { this.distributedKeys = distributedKeys; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index b01a47a..d5229a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -69,14 +69,14 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse<K, V>>() { - @Override public void apply(UUID nodeId, GridNearGetResponse<K, V> res) { + ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { + @Override public void apply(UUID nodeId, GridNearGetResponse res) { processGetResponse(nodeId, res); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse<K, V>>() { - @Override public void apply(UUID nodeId, GridNearLockResponse<K, V> res) { + ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { + @Override public void apply(UUID nodeId, GridNearLockResponse res) { processLockResponse(nodeId, res); } }); @@ -111,12 +111,12 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (F.isEmpty(keys)) return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); - IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(); + IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(); if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { - @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter<K, V> tx) { - return ctx.wrapCloneMap(tx.getAllAsync(ctx, keys, entry, deserializePortable, skipVals)); + @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) { + return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, entry, deserializePortable, skipVals)); } }); } @@ -144,7 +144,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> * @param expiryPlc Expiry policy. * @return Future. */ - IgniteInternalFuture<Map<K, V>> txLoadAsync(GridNearTxLocal<K, V> tx, + IgniteInternalFuture<Map<K, V>> txLoadAsync(GridNearTxLocal tx, @Nullable Collection<? extends K> keys, boolean readThrough, boolean deserializePortable, @@ -175,19 +175,19 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> * @param req Request. */ @SuppressWarnings({"RedundantTypeArguments"}) - public void clearLocks(UUID nodeId, GridDhtUnlockRequest<K, V> req) { + public void clearLocks(UUID nodeId, GridDhtUnlockRequest req) { assert nodeId != null; GridCacheVersion obsoleteVer = ctx.versions().next(); - List<K> keys = req.nearKeys(); + List<KeyCacheObject> keys = req.nearKeys(); if (keys != null) { long topVer = ctx.affinity().affinityTopologyVersion(); - for (K key : keys) { + for (KeyCacheObject key : keys) { while (true) { - GridDistributedCacheEntry<K, V> entry = peekExx(key); + GridDistributedCacheEntry entry = peekExx(key); try { if (entry != null) { @@ -240,27 +240,27 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> * @throws GridDistributedLockCancelledException If lock has been cancelled. */ @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable public GridNearTxRemote<K, V> startRemoteTx(UUID nodeId, GridDhtLockRequest<K, V> req) + @Nullable public GridNearTxRemote startRemoteTx(UUID nodeId, GridDhtLockRequest req) throws IgniteCheckedException, GridDistributedLockCancelledException { - List<K> nearKeys = req.nearKeys(); + List<KeyCacheObject> nearKeys = req.nearKeys(); List<byte[]> keyBytes = req.nearKeyBytes(); assert keyBytes != null; - GridNearTxRemote<K, V> tx = null; + GridNearTxRemote tx = null; ClassLoader ldr = ctx.deploy().globalLoader(); if (ldr != null) { - Collection<IgniteTxKey<K>> evicted = null; + Collection<IgniteTxKey> evicted = null; for (int i = 0; i < nearKeys.size(); i++) { - K key = nearKeys.get(i); + KeyCacheObject key = nearKeys.get(i); if (key == null) continue; - IgniteTxKey<K> txKey = ctx.txKey(key); + IgniteTxKey txKey = ctx.txKey(key); byte[] bytes = !keyBytes.isEmpty() ? keyBytes.get(i) : null; @@ -269,7 +269,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (log.isDebugEnabled()) log.debug("Unmarshalled key: " + key); - GridNearCacheEntry<K, V> entry = null; + GridNearCacheEntry entry = null; while (true) { try { @@ -283,7 +283,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> tx = ctx.tm().nearTx(req.version()); if (tx == null) { - tx = new GridNearTxRemote<>( + tx = new GridNearTxRemote( ctx.shared(), nodeId, req.nearNodeId(), @@ -376,7 +376,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (tx != null && evicted != null) { assert !evicted.isEmpty(); - for (IgniteTxKey<K> evict : evicted) + for (IgniteTxKey evict : evicted) tx.addEvicted(evict); } } @@ -395,7 +395,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> * @param nodeId Node ID. * @param res Response. */ - private void processLockResponse(UUID nodeId, GridNearLockResponse<K, V> res) { + private void processLockResponse(UUID nodeId, GridNearLockResponse res) { assert nodeId != null; assert res != null; @@ -410,7 +410,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> @Override protected IgniteInternalFuture<Boolean> lockAllAsync( Collection<? extends K> keys, long timeout, - IgniteTxLocalEx<K, V> tx, + IgniteTxLocalEx tx, boolean isInvalidate, boolean isRead, boolean retval, @@ -419,8 +419,10 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> IgnitePredicate<Cache.Entry<K, V>>[] filter ) { GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx, - keys, - (GridNearTxLocal<K, V>)tx, + // TODO IGNITE-51 + // keys, + null, + (GridNearTxLocal)tx, isRead, retval, timeout, @@ -476,15 +478,17 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> int keyCnt = -1; - Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null; + Map<ClusterNode, GridNearUnlockRequest> map = null; - Collection<K> locKeys = new LinkedList<>(); + Collection<KeyCacheObject> locKeys = new LinkedList<>(); for (K key : keys) { while (true) { - GridDistributedCacheEntry<K, V> entry = peekExx(key); + KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - if (entry == null || !ctx.isAll(entry.wrapLazyValue(), filter)) + GridDistributedCacheEntry entry = peekExx(cacheKey); + + if (entry == null || !ctx.isAll(entry.<K, V>wrapLazyValue(), filter)) break; // While. try { @@ -513,10 +517,10 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> // Send request to remove from remote nodes. ClusterNode primary = ctx.affinity().primary(key, topVer); - GridNearUnlockRequest<K, V> req = map.get(primary); + GridNearUnlockRequest req = map.get(primary); if (req == null) { - map.put(primary, req = new GridNearUnlockRequest<>(ctx.cacheId(), keyCnt)); + map.put(primary, req = new GridNearUnlockRequest(ctx.cacheId(), keyCnt)); req.version(ver); } @@ -539,7 +543,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> ctx); } else - locKeys.add(key); + locKeys.add(cacheKey); if (log.isDebugEnabled()) log.debug("Removed lock (will distribute): " + rmv); @@ -569,10 +573,10 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (ver == null) return; - for (Map.Entry<ClusterNode, GridNearUnlockRequest<K, V>> mapping : map.entrySet()) { + for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) { ClusterNode n = mapping.getKey(); - GridDistributedUnlockRequest<K, V> req = mapping.getValue(); + GridDistributedUnlockRequest req = mapping.getValue(); if (n.isLocal()) dht.removeLocks(ctx.nodeId(), req.version(), locKeys, true); @@ -601,14 +605,17 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> try { int keyCnt = -1; - Map<ClusterNode, GridNearUnlockRequest<K, V>> map = null; + Map<ClusterNode, GridNearUnlockRequest> map = null; for (K key : keys) { // Send request to remove from remote nodes. - GridNearUnlockRequest<K, V> req = null; + GridNearUnlockRequest req = null; while (true) { - GridDistributedCacheEntry<K, V> entry = peekExx(key); + // TODO IGNITE-51. + KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); + + GridDistributedCacheEntry entry = peekExx(cacheKey); try { if (entry != null) { @@ -632,7 +639,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> req = map.get(primary); if (req == null) { - map.put(primary, req = new GridNearUnlockRequest<>(ctx.cacheId(), keyCnt)); + map.put(primary, req = new GridNearUnlockRequest(ctx.cacheId(), keyCnt)); req.version(ver); } @@ -641,7 +648,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> // Remove candidate from local node first. if (entry.removeLock(cand.version())) { if (primary.isLocal()) { - dht.removeLocks(primary.id(), ver, F.asList(key), true); + dht.removeLocks(primary.id(), ver, F.asList(cacheKey), true); assert req == null; @@ -672,10 +679,10 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver); Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver); - for (Map.Entry<ClusterNode, GridNearUnlockRequest<K, V>> mapping : map.entrySet()) { + for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) { ClusterNode n = mapping.getKey(); - GridDistributedUnlockRequest<K, V> req = mapping.getValue(); + GridDistributedUnlockRequest req = mapping.getValue(); if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys())) { req.completedVersions(committed, rolledback); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index ea46f8b..a8804dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -60,7 +60,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu /** Transaction. */ @GridToStringExclude - private GridNearTxLocal<K, V> tx; + private GridNearTxLocal tx; /** Commit flag. */ private boolean commit; @@ -72,7 +72,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu private AtomicReference<Throwable> err = new AtomicReference<>(null); /** Node mappings. */ - private ConcurrentMap<UUID, GridDistributedTxMapping<K, V>> mappings; + private ConcurrentMap<UUID, GridDistributedTxMapping> mappings; /** Trackable flag. */ private boolean trackable = true; @@ -89,7 +89,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * @param tx Transaction. * @param commit Commit flag. */ - public GridNearTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridNearTxLocal<K, V> tx, boolean commit) { + public GridNearTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridNearTxLocal tx, boolean commit) { super(cctx.kernalContext(), F.<IgniteInternalTx>identityReducer(tx)); assert cctx != null; @@ -197,7 +197,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * @param nodeId Sender. * @param res Result. */ - public void onResult(UUID nodeId, GridNearTxFinishResponse<K, V> res) { + public void onResult(UUID nodeId, GridNearTxFinishResponse res) { if (!isDone()) for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) { if (isMini(fut)) { @@ -225,7 +225,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu long topVer = this.tx.topologyVersion(); for (IgniteTxEntry e : this.tx.writeMap().values()) { - GridCacheContext<K, V> cacheCtx = e.context(); + GridCacheContext cacheCtx = e.context(); try { if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(), topVer)) { @@ -280,7 +280,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu if (tx.onePhaseCommit()) { // No need to send messages as transaction was already committed on remote node. // Finish local mapping only as we need send commit message to backups. - for (GridDistributedTxMapping<K, V> m : mappings.values()) { + for (GridDistributedTxMapping m : mappings.values()) { if (m.node().isLocal()) { IgniteInternalFuture<IgniteInternalTx> fut = cctx.tm().txHandler().finishColocatedLocal(commit, tx); @@ -329,21 +329,21 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu /** * @param mappings Mappings. */ - private void finish(Iterable<GridDistributedTxMapping<K, V>> mappings) { + private void finish(Iterable<GridDistributedTxMapping> mappings) { // Create mini futures. - for (GridDistributedTxMapping<K, V> m : mappings) + for (GridDistributedTxMapping m : mappings) finish(m); } /** * @param m Mapping. */ - private void finish(GridDistributedTxMapping<K, V> m) { + private void finish(GridDistributedTxMapping m) { ClusterNode n = m.node(); assert !m.empty(); - GridNearTxFinishRequest<K, V> req = new GridNearTxFinishRequest<>( + GridNearTxFinishRequest req = new GridNearTxFinishRequest( futId, tx.xidVersion(), tx.threadId(), @@ -421,7 +421,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu /** Keys. */ @GridToStringInclude - private GridDistributedTxMapping<K, V> m; + private GridDistributedTxMapping m; /** * Empty constructor required for {@link Externalizable}. @@ -433,7 +433,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu /** * @param m Mapping. */ - MiniFuture(GridDistributedTxMapping<K, V> m) { + MiniFuture(GridDistributedTxMapping m) { super(cctx.kernalContext()); this.m = m; @@ -456,7 +456,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu /** * @return Keys. */ - public GridDistributedTxMapping<K, V> mapping() { + public GridDistributedTxMapping mapping() { return m; } @@ -485,7 +485,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu /** * @param res Result callback. */ - void onResult(GridNearTxFinishResponse<K, V> res) { + void onResult(GridNearTxFinishResponse res) { if (res.error() != null) onDone(res.error()); else