http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 0be5b97..fc99639 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -37,8 +37,8 @@ import java.util.concurrent.atomic.*; /** * */ -public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Collection<GridCacheEntryInfo<K, V>>> - implements GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> { +public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Collection<GridCacheEntryInfo>> + implements GridDhtFuture<Collection<GridCacheEntryInfo>> { /** */ private static final long serialVersionUID = 0L; @@ -61,7 +61,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col private GridCacheContext<K, V> cctx; /** Keys. */ - private LinkedHashMap<? extends K, Boolean> keys; + private LinkedHashMap<KeyCacheObject, Boolean> keys; /** Reserved partitions. */ private Collection<GridDhtLocalPartition> parts = new GridLeanSet<>(5); @@ -76,7 +76,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col private long topVer; /** Transaction. */ - private IgniteTxLocalEx<K, V> tx; + private IgniteTxLocalEx tx; /** Logger. */ private IgniteLogger log; @@ -125,10 +125,10 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col GridCacheContext<K, V> cctx, long msgId, UUID reader, - LinkedHashMap<? extends K, Boolean> keys, + LinkedHashMap<KeyCacheObject, Boolean> keys, boolean readThrough, boolean reload, - @Nullable IgniteTxLocalEx<K, V> tx, + @Nullable IgniteTxLocalEx tx, long topVer, @Nullable UUID subjId, int taskNameHash, @@ -136,7 +136,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals ) { - super(cctx.kernalContext(), CU.<GridCacheEntryInfo<K, V>>collectionsReducer()); + super(cctx.kernalContext(), CU.<GridCacheEntryInfo>collectionsReducer()); assert reader != null; assert !F.isEmpty(keys); @@ -176,7 +176,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col /** * @return Keys. */ - Collection<? extends K> keys() { + Collection<KeyCacheObject> keys() { return keys.keySet(); } @@ -200,7 +200,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } /** {@inheritDoc} */ - @Override public boolean onDone(Collection<GridCacheEntryInfo<K, V>> res, Throwable err) { + @Override public boolean onDone(Collection<GridCacheEntryInfo> res, Throwable err) { if (super.onDone(res, err)) { // Release all partitions reserved by this future. for (GridDhtLocalPartition part : parts) @@ -215,15 +215,15 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col /** * @param keys Keys. */ - private void map(final LinkedHashMap<? extends K, Boolean> keys) { + private void map(final LinkedHashMap<KeyCacheObject, Boolean> keys) { GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer); if (!F.isEmpty(fut.invalidPartitions())) retries.addAll(fut.invalidPartitions()); add(new GridEmbeddedFuture<>(cctx.kernalContext(), fut, - new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo<K, V>>>() { - @Override public Collection<GridCacheEntryInfo<K, V>> apply(Object o, Exception e) { + new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() { + @Override public Collection<GridCacheEntryInfo> apply(Object o, Exception e) { if (e != null) { // Check error first. if (log.isDebugEnabled()) log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']'); @@ -231,10 +231,10 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col onDone(e); } - LinkedHashMap<K, Boolean> mappedKeys = U.newLinkedHashMap(keys.size()); + LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = U.newLinkedHashMap(keys.size()); // Assign keys to primary nodes. - for (Map.Entry<? extends K, Boolean> key : keys.entrySet()) { + for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) { int part = cctx.affinity().partition(key.getKey()); if (!retries.contains(part)) { @@ -260,7 +260,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * @param parts Parts to map. * @return {@code True} if mapped. */ - private boolean map(K key, Collection<GridDhtLocalPartition> parts) { + private boolean map(KeyCacheObject key, Collection<GridDhtLocalPartition> parts) { GridDhtLocalPartition part = topVer > 0 ? cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) : cache().topology().localPartition(key, false); @@ -287,12 +287,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * @return Future for local get. */ @SuppressWarnings( {"unchecked", "IfMayBeConditional"}) - private IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>> getAsync(final LinkedHashMap<? extends K, Boolean> keys) { + private IgniteInternalFuture<Collection<GridCacheEntryInfo>> getAsync( + final LinkedHashMap<KeyCacheObject, Boolean> keys) + { if (F.isEmpty(keys)) - return new GridFinishedFuture<Collection<GridCacheEntryInfo<K, V>>>(cctx.kernalContext(), - Collections.<GridCacheEntryInfo<K, V>>emptyList()); + return new GridFinishedFuture<Collection<GridCacheEntryInfo>>(cctx.kernalContext(), + Collections.<GridCacheEntryInfo>emptyList()); - final Collection<GridCacheEntryInfo<K, V>> infos = new LinkedList<>(); + final Collection<GridCacheEntryInfo> infos = new LinkedList<>(); String taskName0 = ctx.job().currentTaskName(); @@ -303,12 +305,12 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col GridCompoundFuture<Boolean, Boolean> txFut = null; - for (Map.Entry<? extends K, Boolean> k : keys.entrySet()) { + for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) { while (true) { - GridDhtCacheEntry<K, V> e = cache().entryExx(k.getKey(), topVer); + GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer); try { - GridCacheEntryInfo<K, V> info = e.info(); + GridCacheEntryInfo info = e.info(); // If entry is obsolete. if (info == null) @@ -352,31 +354,33 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col IgniteInternalFuture<Map<K, V>> fut; if (txFut == null || txFut.isDone()) { - if (reload && cctx.readThrough() && cctx.store().configured()) { - fut = cache().reloadAllAsync(keys.keySet(), - true, - skipVals, - subjId, - taskName); - } - else { - if (tx == null) { - fut = cache().getDhtAllAsync(keys.keySet(), - readThrough, - subjId, - taskName, - deserializePortable, - expiryPlc, - skipVals); - } - else { - fut = tx.getAllAsync(cctx, - keys.keySet(), - null, - deserializePortable, - skipVals); - } - } + fut = null; +// TODO IGNITE-51. +// if (reload && cctx.readThrough() && cctx.store().configured()) { +// fut = cache().reloadAllAsync(keys.keySet(), +// true, +// skipVals, +// subjId, +// taskName); +// } +// else { +// if (tx == null) { +// fut = cache().getDhtAllAsync(keys.keySet(), +// readThrough, +// subjId, +// taskName, +// deserializePortable, +// expiryPlc, +// skipVals); +// } +// else { +// fut = tx.getAllAsync(cctx, +// keys.keySet(), +// null, +// deserializePortable, +// skipVals); +// } +// } } else { // If we are here, then there were active transactions for some entries @@ -386,56 +390,59 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col txFut, new C2<Boolean, Exception, IgniteInternalFuture<Map<K, V>>>() { @Override public IgniteInternalFuture<Map<K, V>> apply(Boolean b, Exception e) { - if (e != null) - throw new GridClosureException(e); - - if (reload && cctx.readThrough() && cctx.store().configured()) { - return cache().reloadAllAsync(keys.keySet(), - true, - skipVals, - subjId, - taskName); - } - else { - if (tx == null) { - return cache().getDhtAllAsync(keys.keySet(), - readThrough, - subjId, - taskName, - deserializePortable, - expiryPlc, skipVals); - } - else { - return tx.getAllAsync(cctx, - keys.keySet(), - null, - deserializePortable, - skipVals); - } - } + return null; +// TODO IGNITE-51. +// if (e != null) +// throw new GridClosureException(e); +// +// if (reload && cctx.readThrough() && cctx.store().configured()) { +// return cache().reloadAllAsync(keys.keySet(), +// true, +// skipVals, +// subjId, +// taskName); +// } +// else { +// if (tx == null) { +// return cache().getDhtAllAsync(keys.keySet(), +// readThrough, +// subjId, +// taskName, +// deserializePortable, +// expiryPlc, skipVals); +// } +// else { +// return tx.getAllAsync(cctx, +// keys.keySet(), +// null, +// deserializePortable, +// skipVals); +// } +// } } }, cctx.kernalContext()); } return new GridEmbeddedFuture<>(cctx.kernalContext(), fut, - new C2<Map<K, V>, Exception, Collection<GridCacheEntryInfo<K, V>>>() { - @Override public Collection<GridCacheEntryInfo<K, V>> apply(Map<K, V> map, Exception e) { + new C2<Map<K, V>, Exception, Collection<GridCacheEntryInfo>>() { + @Override public Collection<GridCacheEntryInfo> apply(Map<K, V> map, Exception e) { if (e != null) { onDone(e); return Collections.emptyList(); } else { - for (Iterator<GridCacheEntryInfo<K, V>> it = infos.iterator(); it.hasNext();) { - GridCacheEntryInfo<K, V> info = it.next(); + for (Iterator<GridCacheEntryInfo> it = infos.iterator(); it.hasNext();) { + GridCacheEntryInfo info = it.next(); - V v = map.get(info.key()); + // TODO IGNITE-51. + V v = map.get(info.key().value(cctx)); if (v == null) it.remove(); else - info.value(v); + info.value(cctx.toCacheObject(v)); } return infos;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index dc7f8d7..844d700 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -48,7 +48,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*; * Cache lock future. */ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> - implements GridCacheMvccFuture<K, V, Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion { + implements GridCacheMvccFuture<Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion { /** */ private static final long serialVersionUID = 0L; @@ -74,10 +74,10 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo /** Keys locked so far. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) @GridToStringExclude - private List<GridDhtCacheEntry<K, V>> entries; + private List<GridDhtCacheEntry> entries; /** DHT mappings. */ - private Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> dhtMap = + private Map<ClusterNode, List<GridDhtCacheEntry>> dhtMap = new ConcurrentHashMap8<>(); /** Future ID. */ @@ -110,7 +110,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo private IgnitePredicate<Cache.Entry<K, V>>[] filter; /** Transaction. */ - private GridDhtTxLocalAdapter<K, V> tx; + private GridDhtTxLocalAdapter tx; /** All replies flag. */ private AtomicBoolean mapped = new AtomicBoolean(false); @@ -125,7 +125,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo private final Object mux = new Object(); /** Pending locks. */ - private final Collection<K> pendingLocks = new GridConcurrentHashSet<>(); + private final Collection<KeyCacheObject> pendingLocks = new GridConcurrentHashSet<>(); /** TTL for read operation. */ private long accessTtl; @@ -158,7 +158,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo int cnt, boolean read, long timeout, - GridDhtTxLocalAdapter<K, V> tx, + GridDhtTxLocalAdapter tx, long threadId, long accessTtl, IgnitePredicate<Cache.Entry<K, V>>[] filter) { @@ -259,14 +259,14 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo /** * @return Entries. */ - public Collection<GridDhtCacheEntry<K, V>> entries() { + public Collection<GridDhtCacheEntry> entries() { return F.view(entries, F.notNull()); } /** * @return Entries. */ - public Collection<GridDhtCacheEntry<K, V>> entriesCopy() { + public Collection<GridDhtCacheEntry> entriesCopy() { synchronized (mux) { return new ArrayList<>(entries()); } @@ -346,7 +346,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @throws GridCacheEntryRemovedException If entry was removed. * @throws GridDistributedLockCancelledException If lock is canceled. */ - @Nullable public GridCacheMvccCandidate addEntry(GridDhtCacheEntry<K, V> entry) + @Nullable public GridCacheMvccCandidate addEntry(GridDhtCacheEntry entry) throws GridCacheEntryRemovedException, GridDistributedLockCancelledException { if (log.isDebugEnabled()) log.debug("Adding entry: " + entry); @@ -404,12 +404,12 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo */ private void undoLocks(boolean dist) { // Transactions will undo during rollback. - Collection<GridDhtCacheEntry<K, V>> entriesCp = entriesCopy(); + Collection<GridDhtCacheEntry> entriesCp = entriesCopy(); if (dist && tx == null) { cctx.dhtTx().removeLocks(nearNodeId, lockVer, F.viewReadOnly(entriesCp, - new C1<GridDhtCacheEntry<K, V>, K>() { - @Override public K apply(GridDhtCacheEntry<K, V> e) { + new C1<GridDhtCacheEntry, KeyCacheObject>() { + @Override public KeyCacheObject apply(GridDhtCacheEntry e) { return e.key(); } }), false); @@ -486,7 +486,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param nodeId Sender. * @param res Result. */ - void onResult(UUID nodeId, GridDhtLockResponse<K, V> res) { + void onResult(UUID nodeId, GridDhtLockResponse res) { if (!isDone()) { if (log.isDebugEnabled()) log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']'); @@ -532,7 +532,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo for (int i = 0; i < entries.size(); i++) { while (true) { - GridDistributedCacheEntry<K, V> entry = entries.get(i); + GridDistributedCacheEntry entry = entries.get(i); if (entry == null) break; // While. @@ -563,7 +563,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo if (log.isDebugEnabled()) log.debug("Failed to ready lock because entry was removed (will renew)."); - entries.set(i, (GridDhtCacheEntry<K, V>)cctx.cache().entryEx(entry.key(), topVer)); + entries.set(i, (GridDhtCacheEntry)cctx.cache().entryEx(entry.key(), topVer)); } } } @@ -729,7 +729,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo /** * @param entries Entries. */ - private void map(Iterable<GridDhtCacheEntry<K, V>> entries) { + private void map(Iterable<GridDhtCacheEntry> entries) { if (!mapped.compareAndSet(false, true)) { if (log.isDebugEnabled()) log.debug("Will not map DHT lock future (other thread is mapping): " + this); @@ -744,7 +744,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo boolean hasRmtNodes = false; // Assign keys to primary nodes. - for (GridDhtCacheEntry<K, V> entry : entries) { + for (GridDhtCacheEntry entry : entries) { try { while (true) { try { @@ -789,10 +789,10 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo log.debug("Mapped DHT lock future [dhtMap=" + F.nodeIds(dhtMap.keySet()) + ", dhtLockFut=" + this + ']'); // Create mini futures. - for (Map.Entry<ClusterNode, List<GridDhtCacheEntry<K, V>>> mapped : dhtMap.entrySet()) { + for (Map.Entry<ClusterNode, List<GridDhtCacheEntry>> mapped : dhtMap.entrySet()) { ClusterNode n = mapped.getKey(); - List<GridDhtCacheEntry<K, V>> dhtMapping = mapped.getValue(); + List<GridDhtCacheEntry> dhtMapping = mapped.getValue(); int cnt = F.size(dhtMapping); @@ -801,7 +801,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo MiniFuture fut = new MiniFuture(n, dhtMapping); - GridDhtLockRequest<K, V> req = new GridDhtLockRequest<>( + GridDhtLockRequest req = new GridDhtLockRequest( cctx.cacheId(), nearNodeId, inTx() ? tx.nearXidVersion() : null, @@ -825,8 +825,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo read ? accessTtl : -1L); try { - for (ListIterator<GridDhtCacheEntry<K, V>> it = dhtMapping.listIterator(); it.hasNext();) { - GridDhtCacheEntry<K, V> e = it.next(); + for (ListIterator<GridDhtCacheEntry> it = dhtMapping.listIterator(); it.hasNext();) { + GridDhtCacheEntry e = it.next(); // Must unswap entry so that isNewLocked returns correct value. e.unswap(true, false); @@ -897,7 +897,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @return Entry. * @throws IgniteCheckedException If failed. */ - private GridDhtCacheEntry<K, V> addOwned(GridDhtLockRequest<K, V> req, GridDhtCacheEntry<K, V> e) + private GridDhtCacheEntry addOwned(GridDhtLockRequest req, GridDhtCacheEntry e) throws IgniteCheckedException { while (true) { try { @@ -977,7 +977,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo /** DHT mapping. */ @GridToStringInclude - private List<GridDhtCacheEntry<K, V>> dhtMapping; + private List<GridDhtCacheEntry> dhtMapping; /** * Empty constructor required for {@link Externalizable}. @@ -990,7 +990,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param node Node. * @param dhtMapping Mapping. */ - MiniFuture(ClusterNode node, List<GridDhtCacheEntry<K, V>> dhtMapping) { + MiniFuture(ClusterNode node, List<GridDhtCacheEntry> dhtMapping) { super(cctx.kernalContext()); assert node != null; @@ -1040,7 +1040,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo /** * @param res Result callback. */ - void onResult(GridDhtLockResponse<K, V> res) { + void onResult(GridDhtLockResponse res) { if (res.error() != null) // Fail the whole compound future. onError(res.error()); @@ -1049,8 +1049,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo // Removing mappings for invalid partitions. if (!F.isEmpty(invalidParts)) { - for (Iterator<GridDhtCacheEntry<K, V>> it = dhtMapping.iterator(); it.hasNext();) { - GridDhtCacheEntry<K, V> entry = it.next(); + for (Iterator<GridDhtCacheEntry> it = dhtMapping.iterator(); it.hasNext();) { + GridDhtCacheEntry entry = it.next(); if (invalidParts.contains(entry.partition())) { it.remove(); @@ -1072,11 +1072,11 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo boolean rec = cctx.events().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED); - for (GridCacheEntryInfo<K, V> info : res.preloadEntries()) { + for (GridCacheEntryInfo info : res.preloadEntries()) { try { - GridCacheEntryEx<K,V> entry = cctx.cache().entryEx(info.key(), topVer); + GridCacheEntryEx entry = cctx.cache().entryEx(info.key(), topVer); - if (entry.initialValue(info.value(), info.valueBytes(), info.version(), info.ttl(), + if (entry.initialValue(info.value(), null, info.version(), info.ttl(), info.expireTime(), true, topVer, replicate ? DR_PRELOAD : DR_NONE)) { if (rec && !entry.isInternal()) cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), @@ -1108,13 +1108,13 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param entries Entries to check. */ @SuppressWarnings({"ForLoopReplaceableByForEach"}) - private void evictReaders(GridCacheContext<K, V> cacheCtx, Collection<IgniteTxKey<K>> keys, UUID nodeId, long msgId, - @Nullable List<GridDhtCacheEntry<K, V>> entries) { + private void evictReaders(GridCacheContext<K, V> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId, + @Nullable List<GridDhtCacheEntry> entries) { if (entries == null || keys == null || entries.isEmpty() || keys.isEmpty()) return; - for (ListIterator<GridDhtCacheEntry<K, V>> it = entries.listIterator(); it.hasNext(); ) { - GridDhtCacheEntry<K, V> cached = it.next(); + for (ListIterator<GridDhtCacheEntry> it = entries.listIterator(); it.hasNext(); ) { + GridDhtCacheEntry cached = it.next(); if (keys.contains(cached.txKey())) { while (true) { @@ -1127,7 +1127,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo break; } catch (GridCacheEntryRemovedException ignore) { - GridDhtCacheEntry<K, V> e = cacheCtx.dht().peekExx(cached.key()); + GridDhtCacheEntry e = cacheCtx.dht().peekExx(cached.key()); if (e == null) break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index d9a20ae..f3b60c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -88,7 +88,7 @@ public interface GridDhtPartitionTopology<K, V> { * @throws GridDhtInvalidPartitionException If partition is evicted or absent and * does not belong to this node. */ - @Nullable public GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create) + @Nullable public GridDhtLocalPartition localPartition(int p, long topVer, boolean create) throws GridDhtInvalidPartitionException; /** @@ -98,19 +98,19 @@ public interface GridDhtPartitionTopology<K, V> { * @throws GridDhtInvalidPartitionException If partition is evicted or absent and * does not belong to this node. */ - @Nullable public GridDhtLocalPartition<K, V> localPartition(K key, boolean create) + @Nullable public GridDhtLocalPartition localPartition(Object key, boolean create) throws GridDhtInvalidPartitionException; /** * @return All local partitions by copying them into another list. */ - public List<GridDhtLocalPartition<K, V>> localPartitions(); + public List<GridDhtLocalPartition> localPartitions(); /** * * @return All current local partitions. */ - public Collection<GridDhtLocalPartition<K, V>> currentLocalPartitions(); + public Collection<GridDhtLocalPartition> currentLocalPartitions(); /** * @return Local IDs. @@ -159,12 +159,12 @@ public interface GridDhtPartitionTopology<K, V> { * @param e Entry added to cache. * @return Local partition. */ - public GridDhtLocalPartition<K, V> onAdded(long topVer, GridDhtCacheEntry<K, V> e); + public GridDhtLocalPartition onAdded(long topVer, GridDhtCacheEntry e); /** * @param e Entry removed from cache. */ - public void onRemoved(GridDhtCacheEntry<K, V> e); + public void onRemoved(GridDhtCacheEntry e); /** * @param exchId Exchange ID. @@ -185,12 +185,12 @@ public interface GridDhtPartitionTopology<K, V> { * @param part Partition to own. * @return {@code True} if owned. */ - public boolean own(GridDhtLocalPartition<K, V> part); + public boolean own(GridDhtLocalPartition part); /** * @param part Evicted partition. */ - public void onEvicted(GridDhtLocalPartition<K, V> part, boolean updateSeq); + public void onEvicted(GridDhtLocalPartition part, boolean updateSeq); /** * @param nodeId Node to get partitions for. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index acf00eb..6794569 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -52,7 +52,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, private final IgniteLogger log; /** */ - private final ConcurrentMap<Integer, GridDhtLocalPartition<K, V>> locParts = + private final ConcurrentMap<Integer, GridDhtLocalPartition> locParts = new ConcurrentHashMap8<>(); /** Node to partition map. */ @@ -112,8 +112,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, boolean changed = false; // Synchronously wait for all renting partitions to complete. - for (Iterator<GridDhtLocalPartition<K, V>> it = locParts.values().iterator(); it.hasNext();) { - GridDhtLocalPartition<K, V> p = it.next(); + for (Iterator<GridDhtLocalPartition> it = locParts.values().iterator(); it.hasNext();) { + GridDhtLocalPartition p = it.next(); GridDhtPartitionState state = p.state(); @@ -251,7 +251,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, assert exchId.isJoined(); try { - GridDhtLocalPartition<K, V> locPart = localPartition(p, topVer, true, false); + GridDhtLocalPartition locPart = localPartition(p, topVer, true, false); assert locPart != null; @@ -278,7 +278,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, try { // This will make sure that all non-existing partitions // will be created in MOVING state. - GridDhtLocalPartition<K, V> locPart = localPartition(p, topVer, true, false); + GridDhtLocalPartition locPart = localPartition(p, topVer, true, false); updateLocal(p, loc.id(), locPart.state(), updateSeq); } @@ -308,7 +308,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, // If preloader is disabled, then we simply clear out // the partitions this node is not responsible for. for (int p = 0; p < num; p++) { - GridDhtLocalPartition<K, V> locPart = localPartition(p, topVer, false, false); + GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); boolean belongs = cctx.affinity().localNode(p, topVer); @@ -381,7 +381,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, long updateSeq = this.updateSeq.incrementAndGet(); for (int p = 0; p < num; p++) { - GridDhtLocalPartition<K, V> locPart = localPartition(p, topVer, false, false); + GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); if (cctx.affinity().localNode(p, topVer)) { // This partition will be created during next topology event, @@ -449,7 +449,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, } /** {@inheritDoc} */ - @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create) + @Nullable @Override public GridDhtLocalPartition localPartition(int p, long topVer, boolean create) throws GridDhtInvalidPartitionException { return localPartition(p, topVer, create, true); } @@ -461,11 +461,11 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, * @param updateSeq Update sequence. * @return Local partition. */ - private GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create, boolean updateSeq) { + private GridDhtLocalPartition localPartition(int p, long topVer, boolean create, boolean updateSeq) { while (true) { boolean belongs = cctx.affinity().localNode(p, topVer); - GridDhtLocalPartition<K, V> loc = locParts.get(p); + GridDhtLocalPartition loc = locParts.get(p); if (loc != null && loc.state() == EVICTED) { locParts.remove(p, loc); @@ -488,8 +488,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, lock.writeLock().lock(); try { - GridDhtLocalPartition<K, V> old = locParts.putIfAbsent(p, - loc = new GridDhtLocalPartition<>(cctx, p)); + GridDhtLocalPartition old = locParts.putIfAbsent(p, + loc = new GridDhtLocalPartition(cctx, p)); if (old != null) loc = old; @@ -511,22 +511,22 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, } /** {@inheritDoc} */ - @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean create) { + @Override public GridDhtLocalPartition localPartition(Object key, boolean create) { return localPartition(cctx.affinity().partition(key), -1, create); } /** {@inheritDoc} */ - @Override public List<GridDhtLocalPartition<K, V>> localPartitions() { + @Override public List<GridDhtLocalPartition> localPartitions() { return new LinkedList<>(locParts.values()); } /** {@inheritDoc} */ - @Override public Collection<GridDhtLocalPartition<K, V>> currentLocalPartitions() { + @Override public Collection<GridDhtLocalPartition> currentLocalPartitions() { return locParts.values(); } /** {@inheritDoc} */ - @Override public GridDhtLocalPartition<K, V> onAdded(long topVer, GridDhtCacheEntry<K, V> e) { + @Override public GridDhtLocalPartition onAdded(long topVer, GridDhtCacheEntry e) { /* * Make sure not to acquire any locks here as this method * may be called from sensitive synchronization blocks. @@ -535,7 +535,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, int p = cctx.affinity().partition(e.key()); - GridDhtLocalPartition<K, V> loc = localPartition(p, topVer, true); + GridDhtLocalPartition loc = localPartition(p, topVer, true); assert loc != null; @@ -545,14 +545,14 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, } /** {@inheritDoc} */ - @Override public void onRemoved(GridDhtCacheEntry<K, V> e) { + @Override public void onRemoved(GridDhtCacheEntry e) { /* * Make sure not to acquire any locks here as this method * may be called from sensitive synchronization blocks. * =================================================== */ - GridDhtLocalPartition<K, V> loc = localPartition(cctx.affinity().partition(e.key()), topologyVersion(), false); + GridDhtLocalPartition loc = localPartition(cctx.affinity().partition(e.key()), topologyVersion(), false); if (loc != null) loc.onRemoved(e); @@ -904,7 +904,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, UUID locId = cctx.nodeId(); - for (GridDhtLocalPartition<K, V> part : locParts.values()) { + for (GridDhtLocalPartition part : locParts.values()) { GridDhtPartitionState state = part.state(); if (state.active()) { @@ -1064,7 +1064,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, } /** {@inheritDoc} */ - @Override public boolean own(GridDhtLocalPartition<K, V> part) { + @Override public boolean own(GridDhtLocalPartition part) { ClusterNode loc = cctx.localNode(); lock.writeLock().lock(); @@ -1088,7 +1088,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, } /** {@inheritDoc} */ - @Override public void onEvicted(GridDhtLocalPartition<K, V> part, boolean updateSeq) { + @Override public void onEvicted(GridDhtLocalPartition part, boolean updateSeq) { assert updateSeq || lock.isWriteLockedByCurrentThread(); lock.writeLock().lock(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 62c6515..9661da8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -73,7 +73,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param ctx Cache context. * @param map Cache map. */ - protected GridDhtTransactionalCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, V> map) { + protected GridDhtTransactionalCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { super(ctx, map); } @@ -85,38 +85,38 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach preldr.start(); - ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest<K, V>>() { - @Override public void apply(UUID nodeId, GridNearGetRequest<K, V> req) { + ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { + @Override public void apply(UUID nodeId, GridNearGetRequest req) { processNearGetRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest<K, V>>() { - @Override public void apply(UUID nodeId, GridNearLockRequest<K, V> req) { + ctx.io().addHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() { + @Override public void apply(UUID nodeId, GridNearLockRequest req) { processNearLockRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest<K, V>>() { - @Override public void apply(UUID nodeId, GridDhtLockRequest<K, V> req) { + ctx.io().addHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest>() { + @Override public void apply(UUID nodeId, GridDhtLockRequest req) { processDhtLockRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse<K, V>>() { - @Override public void apply(UUID nodeId, GridDhtLockResponse<K, V> req) { + ctx.io().addHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse>() { + @Override public void apply(UUID nodeId, GridDhtLockResponse req) { processDhtLockResponse(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest<K, V>>() { - @Override public void apply(UUID nodeId, GridNearUnlockRequest<K, V> req) { + ctx.io().addHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest>() { + @Override public void apply(UUID nodeId, GridNearUnlockRequest req) { processNearUnlockRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest<K, V>>() { - @Override public void apply(UUID nodeId, GridDhtUnlockRequest<K, V> req) { + ctx.io().addHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest>() { + @Override public void apply(UUID nodeId, GridDhtUnlockRequest req) { processDhtUnlockRequest(nodeId, req); } }); @@ -134,35 +134,35 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @throws GridDistributedLockCancelledException If lock has been cancelled. */ @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable GridDhtTxRemote<K, V> startRemoteTx(UUID nodeId, - GridDhtLockRequest<K, V> req, - GridDhtLockResponse<K, V> res) + @Nullable GridDhtTxRemote startRemoteTx(UUID nodeId, + GridDhtLockRequest req, + GridDhtLockResponse res) throws IgniteCheckedException, GridDistributedLockCancelledException { - List<K> keys = req.keys(); - GridDhtTxRemote<K, V> tx = null; + List<KeyCacheObject> keys = req.keys(); + GridDhtTxRemote tx = null; int size = F.size(keys); for (int i = 0; i < size; i++) { - K key = keys.get(i); + KeyCacheObject key = keys.get(i); if (key == null) continue; - IgniteTxKey<K> txKey = ctx.txKey(key); + IgniteTxKey txKey = ctx.txKey(key); assert F.isEmpty(req.candidatesByIndex(i)); if (log.isDebugEnabled()) log.debug("Unmarshalled key: " + key); - GridDistributedCacheEntry<K, V> entry = null; + GridDistributedCacheEntry entry = null; while (true) { try { int part = ctx.affinity().partition(key); - GridDhtLocalPartition<K, V> locPart = ctx.topology().localPartition(part, req.topologyVersion(), + GridDhtLocalPartition locPart = ctx.topology().localPartition(part, req.topologyVersion(), false); if (locPart == null || !locPart.reserve()) { @@ -186,7 +186,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach tx = ctx.tm().tx(req.version()); if (tx == null) { - tx = new GridDhtTxRemote<>( + tx = new GridDhtTxRemote( ctx.shared(), req.nodeId(), req.futureId(), @@ -249,7 +249,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (req.needPreloadKey(i)) { entry.unswap(); - GridCacheEntryInfo<K, V> info = entry.info(); + GridCacheEntryInfo info = entry.info(); if (info != null && !info.isNew() && !info.isDeleted()) res.addPreloadEntry(info); @@ -337,7 +337,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param nodeId Node ID. * @param req Request. */ - protected final void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest<K, V> req) { + protected final void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest req) { IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null : ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion()); @@ -356,7 +356,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param nodeId Node ID. * @param req Request. */ - protected final void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest<K, V> req) { + protected final void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) { assert nodeId != null; assert req != null; assert !nodeId.equals(locNodeId); @@ -367,16 +367,16 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach int cnt = F.size(req.keys()); - GridDhtLockResponse<K, V> res; + GridDhtLockResponse res; - GridDhtTxRemote<K, V> dhtTx = null; - GridNearTxRemote<K, V> nearTx = null; + GridDhtTxRemote dhtTx = null; + GridNearTxRemote nearTx = null; boolean fail = false; boolean cancelled = false; try { - res = new GridDhtLockResponse<>(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), cnt); + res = new GridDhtLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), cnt); dhtTx = startRemoteTx(nodeId, req, res); nearTx = isNearEnabled(cacheCfg) ? near().startRemoteTx(nodeId, req) : null; @@ -385,10 +385,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach res.nearEvicted(nearTx.evicted()); else { if (!F.isEmpty(req.nearKeys())) { - Collection<IgniteTxKey<K>> nearEvicted = new ArrayList<>(req.nearKeys().size()); + Collection<IgniteTxKey> nearEvicted = new ArrayList<>(req.nearKeys().size()); - nearEvicted.addAll(F.viewReadOnly(req.nearKeys(), new C1<K, IgniteTxKey<K>>() { - @Override public IgniteTxKey<K> apply(K k) { + nearEvicted.addAll(F.viewReadOnly(req.nearKeys(), new C1<KeyCacheObject, IgniteTxKey>() { + @Override public IgniteTxKey apply(KeyCacheObject k) { return ctx.txKey(k); } })); @@ -402,7 +402,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach U.error(log, err, e); - res = new GridDhtLockResponse<>(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), + res = new GridDhtLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), new IgniteTxRollbackCheckedException(err, e)); fail = true; @@ -412,7 +412,11 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach U.error(log, err, e); - res = new GridDhtLockResponse<>(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), new IgniteCheckedException(err, e)); + res = new GridDhtLockResponse(ctx.cacheId(), + req.version(), + req.futureId(), + req.miniId(), + new IgniteCheckedException(err, e)); fail = true; } @@ -454,12 +458,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (nearTx != null) // Even though this should never happen, we leave this check for consistency. nearTx.rollback(); - List<K> keys = req.keys(); + List<KeyCacheObject> keys = req.keys(); if (keys != null) { - for (K key : keys) { + for (KeyCacheObject key : keys) { while (true) { - GridDistributedCacheEntry<K, V> entry = peekExx(key); + GridDistributedCacheEntry entry = peekExx(key); try { if (entry != null) { @@ -487,7 +491,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } /** {@inheritDoc} */ - protected void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest<K, V> req) { + protected void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest req) { clearLocks(nodeId, req); if (isNearEnabled(cacheCfg)) @@ -498,7 +502,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param nodeId Node ID. * @param req Request. */ - private void processNearLockRequest(UUID nodeId, GridNearLockRequest<K, V> req) { + private void processNearLockRequest(UUID nodeId, GridNearLockRequest req) { assert isAffinityNode(cacheCfg); assert nodeId != null; assert req != null; @@ -528,7 +532,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param nodeId Node ID. * @param res Response. */ - private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse<K, V> res) { + private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) { assert nodeId != null; assert res != null; GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(), @@ -548,7 +552,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach @Override public IgniteInternalFuture<Boolean> lockAllAsync( @Nullable Collection<? extends K> keys, long timeout, - IgniteTxLocalEx<K, V> txx, + IgniteTxLocalEx txx, boolean isInvalidate, boolean isRead, boolean retval, @@ -583,7 +587,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach */ public GridDhtFuture<Boolean> lockAllAsyncInternal(@Nullable Collection<? extends K> keys, long timeout, - IgniteTxLocalEx<K, V> txx, + IgniteTxLocalEx txx, boolean isInvalidate, boolean isRead, boolean retval, @@ -593,7 +597,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (keys == null || keys.isEmpty()) return new GridDhtFinishedFuture<>(ctx.kernalContext(), true); - GridDhtTxLocalAdapter<K, V> tx = (GridDhtTxLocalAdapter<K, V>)txx; + GridDhtTxLocalAdapter tx = (GridDhtTxLocalAdapter)txx; assert tx != null; @@ -614,9 +618,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (key == null) continue; + // TODO IGNITE-51. + KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); + try { while (true) { - GridDhtCacheEntry<K, V> entry = entryExx(key, tx.topologyVersion()); + GridDhtCacheEntry entry = entryExx(cacheKey, tx.topologyVersion()); try { fut.addEntry(entry); @@ -662,12 +669,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param filter0 Filter. * @return Future. */ - public IgniteInternalFuture<GridNearLockResponse<K, V>> lockAllAsync( + public IgniteInternalFuture<GridNearLockResponse> lockAllAsync( final GridCacheContext<K, V> cacheCtx, final ClusterNode nearNode, - final GridNearLockRequest<K, V> req, + final GridNearLockRequest req, @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter0) { - final List<K> keys = req.keys(); + final List<KeyCacheObject> keys = req.keys(); IgniteInternalFuture<Object> keyFut = null; @@ -687,15 +694,15 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach keyFut = new GridFinishedFutureEx<>(); return new GridEmbeddedFuture<>(true, keyFut, - new C2<Object, Exception, IgniteInternalFuture<GridNearLockResponse<K,V>>>() { - @Override public IgniteInternalFuture<GridNearLockResponse<K, V>> apply(Object o, Exception exx) { + new C2<Object, Exception, IgniteInternalFuture<GridNearLockResponse>>() { + @Override public IgniteInternalFuture<GridNearLockResponse> apply(Object o, Exception exx) { if (exx != null) return new GridDhtFinishedFuture<>(ctx.kernalContext(), exx); IgnitePredicate<Cache.Entry<K, V>>[] filter = filter0; // Set message into thread context. - GridDhtTxLocal<K, V> tx = null; + GridDhtTxLocal tx = null; try { int cnt = keys.size(); @@ -711,7 +718,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // Unmarshal filter first. if (filter == null) - filter = req.filter(); + filter = (IgnitePredicate[])req.filter(); GridDhtLockFuture<K, V> fut = null; @@ -735,14 +742,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach boolean timedout = false; - for (K key : keys) { + for (KeyCacheObject key : keys) { if (timedout) break; while (true) { // Specify topology version to make sure containment is checked // based on the requested version, not the latest. - GridDhtCacheEntry<K, V> entry = entryExx(key, req.topologyVersion()); + GridDhtCacheEntry entry = entryExx(key, req.topologyVersion()); try { if (fut != null) { @@ -780,7 +787,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // Handle implicit locks for pessimistic transactions. if (req.inTx()) { if (tx == null) { - tx = new GridDhtTxLocal<>( + tx = new GridDhtTxLocal( ctx.shared(), nearNode.id(), req.version(), @@ -826,7 +833,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (log.isDebugEnabled()) log.debug("Performing DHT lock [tx=" + tx + ", entries=" + entries + ']'); - IgniteInternalFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync( + IgniteInternalFuture<GridCacheReturn<Object>> txFut = tx.lockAllAsync( cacheCtx, entries, req.onePhaseCommit(), @@ -834,12 +841,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.txRead(), req.accessTtl()); - final GridDhtTxLocal<K, V> t = tx; + final GridDhtTxLocal t = tx; - return new GridDhtEmbeddedFuture<>( + return new GridDhtEmbeddedFuture( txFut, - new C2<GridCacheReturn<V>, Exception, IgniteInternalFuture<GridNearLockResponse<K, V>>>() { - @Override public IgniteInternalFuture<GridNearLockResponse<K, V>> apply( + new C2<GridCacheReturn<V>, Exception, IgniteInternalFuture<GridNearLockResponse>>() { + @Override public IgniteInternalFuture<GridNearLockResponse> apply( GridCacheReturn<V> o, Exception e) { if (e != null) e = U.unwrap(e); @@ -847,7 +854,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach assert !t.empty(); // Create response while holding locks. - final GridNearLockResponse<K, V> resp = createLockReply(nearNode, + final GridNearLockResponse resp = createLockReply(nearNode, entries, req, t, @@ -858,8 +865,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach assert t.implicit(); return t.commitAsync().chain( - new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse<K, V>>() { - @Override public GridNearLockResponse<K, V> apply(IgniteInternalFuture<IgniteInternalTx> f) { + new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() { + @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx> f) { try { // Check for error. f.get(); @@ -894,14 +901,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach return new GridDhtEmbeddedFuture<>( ctx.kernalContext(), fut, - new C2<Boolean, Exception, GridNearLockResponse<K, V>>() { - @Override public GridNearLockResponse<K, V> apply(Boolean b, Exception e) { + new C2<Boolean, Exception, GridNearLockResponse>() { + @Override public GridNearLockResponse apply(Boolean b, Exception e) { if (e != null) e = U.unwrap(e); else if (!b) e = new GridCacheLockTimeoutException(req.version()); - GridNearLockResponse<K, V> res = createLockReply(nearNode, + GridNearLockResponse res = createLockReply(nearNode, entries, req, null, @@ -946,11 +953,11 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param err Error. * @return Response. */ - private GridNearLockResponse<K, V> createLockReply( + private GridNearLockResponse createLockReply( ClusterNode nearNode, List<GridCacheEntryEx> entries, - GridNearLockRequest<K, V> req, - @Nullable GridDhtTxLocalAdapter<K,V> tx, + GridNearLockRequest req, + @Nullable GridDhtTxLocalAdapter tx, GridCacheVersion mappedVer, Throwable err) { assert mappedVer != null; @@ -958,7 +965,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach try { // Send reply back to originating near node. - GridNearLockResponse<K, V> res = new GridNearLockResponse<>(ctx.cacheId(), + GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err); if (err == null) { @@ -987,7 +994,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver); - V val = null; + CacheObject val = null; if (ret) val = e.innerGet(tx, @@ -1027,7 +1034,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // We include values into response since they are required for local // calls and won't be serialized. We are also including DHT version. res.addValueBytes( - val != null ? val : (V)valBytes.getIfPlain(), + val, + // TODO IGNITE-51 + // val != null ? val : (V)valBytes.getIfPlain(), ret ? valBytes.getIfMarshaled() : null, filterPassed, ver, @@ -1069,7 +1078,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach U.error(log, "Failed to get value for lock reply message for node [node=" + U.toShortString(nearNode) + ", req=" + req + ']', e); - return new GridNearLockResponse<>(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false, + return new GridNearLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false, entries.size(), e); } } @@ -1084,9 +1093,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach */ private void sendLockReply( ClusterNode nearNode, - @Nullable IgniteInternalTx<K,V> tx, - GridNearLockRequest<K, V> req, - GridNearLockResponse<K, V> res + @Nullable IgniteInternalTx tx, + GridNearLockRequest req, + GridNearLockResponse res ) { Throwable err = res.error(); @@ -1150,15 +1159,15 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param req Request. */ @SuppressWarnings({"RedundantTypeArguments"}) - private void clearLocks(UUID nodeId, GridDistributedUnlockRequest<K, V> req) { + private void clearLocks(UUID nodeId, GridDistributedUnlockRequest req) { assert nodeId != null; - List<K> keys = req.keys(); + List<KeyCacheObject> keys = req.keys(); if (keys != null) { - for (K key : keys) { + for (KeyCacheObject key : keys) { while (true) { - GridDistributedCacheEntry<K, V> entry = peekExx(key); + GridDistributedCacheEntry entry = peekExx(key); boolean created = false; @@ -1212,7 +1221,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param req Request. */ @SuppressWarnings({"RedundantTypeArguments", "TypeMayBeWeakened"}) - private void processNearUnlockRequest(UUID nodeId, GridNearUnlockRequest<K, V> req) { + private void processNearUnlockRequest(UUID nodeId, GridNearUnlockRequest req) { assert isAffinityNode(cacheCfg); assert nodeId != null; @@ -1230,10 +1239,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach */ private void map(UUID nodeId, long topVer, - GridCacheEntryEx<K,V> cached, + GridCacheEntryEx cached, Collection<UUID> readers, - Map<ClusterNode, List<T2<K, byte[]>>> dhtMap, - Map<ClusterNode, List<T2<K, byte[]>>> nearMap) + Map<ClusterNode, List<T2<KeyCacheObject, byte[]>>> dhtMap, + Map<ClusterNode, List<T2<KeyCacheObject, byte[]>>> nearMap) throws IgniteCheckedException { Collection<ClusterNode> dhtNodes = ctx.dht().topology().nodes(cached.partition(), topVer); @@ -1279,10 +1288,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach @SuppressWarnings( {"MismatchedQueryAndUpdateOfCollection"}) private void map(GridCacheEntryEx entry, @Nullable Iterable<? extends ClusterNode> nodes, - Map<ClusterNode, List<T2<K, byte[]>>> map) throws IgniteCheckedException { + Map<ClusterNode, List<T2<KeyCacheObject, byte[]>>> map) throws IgniteCheckedException { if (nodes != null) { for (ClusterNode n : nodes) { - List<T2<K, byte[]>> keys = map.get(n); + List<T2<KeyCacheObject, byte[]>> keys = map.get(n); if (keys == null) map.put(n, keys = new LinkedList<>()); @@ -1298,7 +1307,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param keys Keys. * @param unmap Flag for un-mapping version. */ - public void removeLocks(UUID nodeId, GridCacheVersion ver, Iterable<? extends K> keys, boolean unmap) { + public void removeLocks(UUID nodeId, GridCacheVersion ver, Iterable<KeyCacheObject> keys, boolean unmap) { assert nodeId != null; assert ver != null; @@ -1308,16 +1317,16 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // Remove mapped versions. GridCacheVersion dhtVer = unmap ? ctx.mvcc().unmapVersion(ver) : ver; - Map<ClusterNode, List<T2<K, byte[]>>> dhtMap = new HashMap<>(); - Map<ClusterNode, List<T2<K, byte[]>>> nearMap = new HashMap<>(); + Map<ClusterNode, List<T2<KeyCacheObject, byte[]>>> dhtMap = new HashMap<>(); + Map<ClusterNode, List<T2<KeyCacheObject, byte[]>>> nearMap = new HashMap<>(); GridCacheVersion obsoleteVer = null; - for (K key : keys) { + for (KeyCacheObject key : keys) { while (true) { boolean created = false; - GridDhtCacheEntry<K, V> entry = peekExx(key); + GridDhtCacheEntry entry = peekExx(key); if (entry == null) { entry = entryExx(key); @@ -1397,23 +1406,23 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver); // Backups. - for (Map.Entry<ClusterNode, List<T2<K, byte[]>>> entry : dhtMap.entrySet()) { + for (Map.Entry<ClusterNode, List<T2<KeyCacheObject, byte[]>>> entry : dhtMap.entrySet()) { ClusterNode n = entry.getKey(); - List<T2<K, byte[]>> keyBytes = entry.getValue(); + List<T2<KeyCacheObject, byte[]>> keyBytes = entry.getValue(); - GridDhtUnlockRequest<K, V> req = new GridDhtUnlockRequest<>(ctx.cacheId(), keyBytes.size()); + GridDhtUnlockRequest req = new GridDhtUnlockRequest(ctx.cacheId(), keyBytes.size()); req.version(dhtVer); try { - for (T2<K, byte[]> key : keyBytes) + for (T2<KeyCacheObject, byte[]> key : keyBytes) req.addKey(key.get1(), key.get2(), ctx); keyBytes = nearMap.get(n); if (keyBytes != null) - for (T2<K, byte[]> key : keyBytes) + for (T2<KeyCacheObject, byte[]> key : keyBytes) req.addNearKey(key.get1(), key.get2(), ctx.shared()); req.completedVersions(committed, rolledback); @@ -1430,18 +1439,18 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } // Readers. - for (Map.Entry<ClusterNode, List<T2<K, byte[]>>> entry : nearMap.entrySet()) { + for (Map.Entry<ClusterNode, List<T2<KeyCacheObject, byte[]>>> entry : nearMap.entrySet()) { ClusterNode n = entry.getKey(); if (!dhtMap.containsKey(n)) { - List<T2<K, byte[]>> keyBytes = entry.getValue(); + List<T2<KeyCacheObject, byte[]>> keyBytes = entry.getValue(); - GridDhtUnlockRequest<K, V> req = new GridDhtUnlockRequest<>(ctx.cacheId(), keyBytes.size()); + GridDhtUnlockRequest req = new GridDhtUnlockRequest(ctx.cacheId(), keyBytes.size()); req.version(dhtVer); try { - for (T2<K, byte[]> key : keyBytes) + for (T2<KeyCacheObject, byte[]> key : keyBytes) req.addNearKey(key.get1(), key.get2(), ctx.shared()); req.completedVersions(committed, rolledback); @@ -1464,7 +1473,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param ver Version. * @throws IgniteCheckedException If invalidate failed. */ - private void invalidateNearEntry(K key, GridCacheVersion ver) throws IgniteCheckedException { + private void invalidateNearEntry(KeyCacheObject key, GridCacheVersion ver) throws IgniteCheckedException { GridCacheEntryEx nearEntry = near().peekEx(key); if (nearEntry != null) @@ -1475,7 +1484,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param key Key * @param ver Version. */ - private void obsoleteNearEntry(K key, GridCacheVersion ver) { + private void obsoleteNearEntry(KeyCacheObject key, GridCacheVersion ver) { GridCacheEntryEx nearEntry = near().peekEx(key); if (nearEntry != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 7dac17b..d345daf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -58,7 +58,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** Transaction. */ @GridToStringExclude - private GridDhtTxLocalAdapter<K, V> tx; + private GridDhtTxLocalAdapter tx; /** Commit flag. */ private boolean commit; @@ -71,10 +71,10 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur private AtomicReference<Throwable> err = new AtomicReference<>(null); /** DHT mappings. */ - private Map<UUID, GridDistributedTxMapping<K, V>> dhtMap; + private Map<UUID, GridDistributedTxMapping> dhtMap; /** Near mappings. */ - private Map<UUID, GridDistributedTxMapping<K, V>> nearMap; + private Map<UUID, GridDistributedTxMapping> nearMap; /** Trackable flag. */ private boolean trackable = true; @@ -91,7 +91,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @param tx Transaction. * @param commit Commit flag. */ - public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter<K, V> tx, boolean commit) { + public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter tx, boolean commit) { super(cctx.kernalContext(), F.<IgniteInternalTx>identityReducer(tx)); this.cctx = cctx; @@ -196,7 +196,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @param nodeId Sender. * @param res Result. */ - public void onResult(UUID nodeId, GridDhtTxFinishResponse<K, V> res) { + public void onResult(UUID nodeId, GridDhtTxFinishResponse res) { if (!isDone()) { for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) { if (isMini(fut)) { @@ -281,8 +281,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @param nearMap Near map. * @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for. */ - private boolean finish(Map<UUID, GridDistributedTxMapping<K, V>> dhtMap, - Map<UUID, GridDistributedTxMapping<K, V>> nearMap) { + private boolean finish(Map<UUID, GridDistributedTxMapping> dhtMap, + Map<UUID, GridDistributedTxMapping> nearMap) { if (tx.onePhaseCommit()) return false; @@ -291,12 +291,12 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur boolean sync = commit ? tx.syncCommit() : tx.syncRollback(); // Create mini futures. - for (GridDistributedTxMapping<K, V> dhtMapping : dhtMap.values()) { + for (GridDistributedTxMapping dhtMapping : dhtMap.values()) { ClusterNode n = dhtMapping.node(); assert !n.isLocal(); - GridDistributedTxMapping<K, V> nearMapping = nearMap.get(n.id()); + GridDistributedTxMapping nearMapping = nearMap.get(n.id()); if (dhtMapping.empty() && nearMapping != null && nearMapping.empty()) // Nothing to send. @@ -306,7 +306,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur add(fut); // Append new future. - GridDhtTxFinishRequest<K, V> req = new GridDhtTxFinishRequest<>( + GridDhtTxFinishRequest req = new GridDhtTxFinishRequest( tx.nearNodeId(), futId, fut.futureId(), @@ -347,7 +347,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur } } - for (GridDistributedTxMapping<K, V> nearMapping : nearMap.values()) { + for (GridDistributedTxMapping nearMapping : nearMap.values()) { if (!dhtMap.containsKey(nearMapping.node().id())) { if (nearMapping.empty()) // Nothing to send. @@ -357,7 +357,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur add(fut); // Append new future. - GridDhtTxFinishRequest<K, V> req = new GridDhtTxFinishRequest<>( + GridDhtTxFinishRequest req = new GridDhtTxFinishRequest( tx.nearNodeId(), futId, fut.futureId(), @@ -423,11 +423,11 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** DHT mapping. */ @GridToStringInclude - private GridDistributedTxMapping<K, V> dhtMapping; + private GridDistributedTxMapping dhtMapping; /** Near mapping. */ @GridToStringInclude - private GridDistributedTxMapping<K, V> nearMapping; + private GridDistributedTxMapping nearMapping; /** * Empty constructor required for {@link Externalizable}. @@ -440,7 +440,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @param dhtMapping Mapping. * @param nearMapping nearMapping. */ - MiniFuture(GridDistributedTxMapping<K, V> dhtMapping, GridDistributedTxMapping<K, V> nearMapping) { + MiniFuture(GridDistributedTxMapping dhtMapping, GridDistributedTxMapping nearMapping) { super(cctx.kernalContext()); assert dhtMapping == null || nearMapping == null || dhtMapping.node() == nearMapping.node(); @@ -488,7 +488,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** * @param res Result callback. */ - void onResult(GridDhtTxFinishResponse<K, V> res) { + void onResult(GridDhtTxFinishResponse res) { if (log.isDebugEnabled()) log.debug("Transaction synchronously completed on node [node=" + node() + ", res=" + res + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 6c4fd56..a5e98ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -67,7 +67,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa /** Future. */ @GridToStringExclude - private final AtomicReference<GridDhtTxPrepareFuture<K, V>> prepFut = + private final AtomicReference<GridDhtTxPrepareFuture> prepFut = new AtomicReference<>(); /** @@ -213,7 +213,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } /** {@inheritDoc} */ - @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) { + @Override protected boolean updateNearCache(GridCacheContext cacheCtx, KeyCacheObject key, long topVer) { return cacheCtx.isDht() && isNearEnabled(cacheCtx) && !cctx.localNodeId().equals(nearNodeId()); } @@ -246,11 +246,11 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } /** {@inheritDoc} */ - @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, + @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry cached, IgniteTxEntry entry, long topVer) { // Don't add local node as reader. if (!cctx.localNodeId().equals(nearNodeId)) { - GridCacheContext<K, V> cacheCtx = cached.context(); + GridCacheContext cacheCtx = cached.context(); while (true) { try { @@ -283,7 +283,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa return prepareAsync( null, null, - Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), + Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), 0, nearMiniId, null, @@ -293,7 +293,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } // For pessimistic mode we don't distribute prepare request. - GridDhtTxPrepareFuture<K, V> fut = prepFut.get(); + GridDhtTxPrepareFuture fut = prepFut.get(); if (fut == null) { // Future must be created before any exception can be thrown. @@ -301,7 +301,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa cctx, this, nearMiniId, - Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), + Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), true, needReturnValue(), null, @@ -390,7 +390,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa needReturnValue(), lastBackups, completeCb))) { - GridDhtTxPrepareFuture<K, V> f = prepFut.get(); + GridDhtTxPrepareFuture f = prepFut.get(); assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']'; @@ -480,11 +480,11 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (pessimistic()) prepareAsync(); - final GridDhtTxFinishFuture<K, V> fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true); + final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true); cctx.mvcc().addFuture(fut); - GridDhtTxPrepareFuture<K, V> prep = prepFut.get(); + GridDhtTxPrepareFuture prep = prepFut.get(); if (prep != null) { if (prep.isDone()) { @@ -560,7 +560,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } /** {@inheritDoc} */ - @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture<K, V> fut) { + @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) { assert optimistic(); prepFut.compareAndSet(fut, null); @@ -568,9 +568,9 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa /** {@inheritDoc} */ @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() { - GridDhtTxPrepareFuture<K, V> prepFut = this.prepFut.get(); + GridDhtTxPrepareFuture prepFut = this.prepFut.get(); - final GridDhtTxFinishFuture<K, V> fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false); + final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false); cctx.mvcc().addFuture(fut); @@ -651,7 +651,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa return; } - GridNearTxFinishResponse<K, V> res = new GridNearTxFinishResponse<>(nearXidVer, threadId, nearFinFutId, + GridNearTxFinishResponse res = new GridNearTxFinishResponse(nearXidVer, threadId, nearFinFutId, nearFinMiniId, err); try {