http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index c8aac7c..78f2557 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -53,7 +53,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap private static final long serialVersionUID = 0L; /** Topology. */ - private GridDhtPartitionTopology<K, V> top; + private GridDhtPartitionTopology top; /** Preloader. */ protected GridCachePreloader<K, V> preldr; @@ -156,7 +156,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * @return Partition topology. */ - public GridDhtPartitionTopology<K, V> topology() { + public GridDhtPartitionTopology topology() { return top; } @@ -301,8 +301,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. */ @Override public GridCacheEntryEx entryEx(KeyCacheObject key, boolean touch) - throws GridDhtInvalidPartitionException - { + throws GridDhtInvalidPartitionException { return super.entryEx(key, touch); }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index bb80c17..6226f09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -306,14 +306,14 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { * @throws GridCacheEntryRemovedException If entry has been removed. */ @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext"}) - @Nullable public synchronized GridTuple3<GridCacheVersion, CacheObject, byte[]> versionedValue(long topVer) + @Nullable public synchronized IgniteBiTuple<GridCacheVersion, CacheObject> versionedValue(long topVer) throws GridCacheEntryRemovedException { if (isNew() || !valid(-1) || deletedUnlocked()) return null; else { CacheObject val0 = valueBytesUnlocked(); - return F.t(ver, val0, null); + return F.t(ver, val0); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 8eb0809..bc84749 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -338,7 +338,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col if (txFut != null) txFut.markInitialized(); - IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> fut = null; + IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> fut; if (txFut == null || txFut.isDone()) { if (reload && cctx.readThrough() && cctx.store().configured()) { @@ -361,9 +361,9 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col fut = tx.getAllAsync(cctx, keys.keySet(), null, - false, + /*deserialize portable*/false, skipVals, - true); + /*keep cache objects*/true); } } } @@ -397,9 +397,9 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col return tx.getAllAsync(cctx, keys.keySet(), null, - false, + /*deserialize portable*/false, skipVals, - true); + /*keep cache objects*/true); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index f3b60c7..48d15aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -29,7 +29,7 @@ import java.util.*; * DHT partition topology. */ @GridToStringExclude -public interface GridDhtPartitionTopology<K, V> { +public interface GridDhtPartitionTopology { /** * Locks the topology, usually during mapping on locks or transactions. */ @@ -46,7 +46,7 @@ public interface GridDhtPartitionTopology<K, V> { * @param exchId Exchange ID. * @param exchFut Exchange future. */ - public void updateTopologyVersion(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture<K, V> exchFut); + public void updateTopologyVersion(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut); /** * Topology version. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 6794569..3ec1113 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -38,7 +38,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * Partition topology. */ @GridToStringExclude -class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, V> { +class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { /** If true, then check consistency. */ private static final boolean CONSISTENCY_CHECK = false; @@ -150,7 +150,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, /** {@inheritDoc} */ @Override public void updateTopologyVersion(GridDhtPartitionExchangeId exchId, - GridDhtPartitionsExchangeFuture<K, V> exchFut) { + GridDhtPartitionsExchangeFuture exchFut) { lock.writeLock().lock(); try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index e367588..04b6f4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -825,7 +825,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (log.isDebugEnabled()) log.debug("Performing DHT lock [tx=" + tx + ", entries=" + entries + ']'); - IgniteInternalFuture<GridCacheReturn<Object>> txFut = tx.lockAllAsync( + IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync( cacheCtx, entries, req.onePhaseCommit(), @@ -837,9 +837,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach return new GridDhtEmbeddedFuture( txFut, - new C2<GridCacheReturn<V>, Exception, IgniteInternalFuture<GridNearLockResponse>>() { + new C2<GridCacheReturn, Exception, IgniteInternalFuture<GridNearLockResponse>>() { @Override public IgniteInternalFuture<GridNearLockResponse> apply( - GridCacheReturn<V> o, Exception e) { + GridCacheReturn o, Exception e) { if (e != null) e = U.unwrap(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index ce35b2e..5cca001 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -500,7 +500,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @return Lock future. */ @SuppressWarnings("ForLoopReplaceableByForEach") - IgniteInternalFuture<GridCacheReturn<Object>> lockAllAsync( + IgniteInternalFuture<GridCacheReturn> lockAllAsync( GridCacheContext cacheCtx, List<GridCacheEntryEx> entries, boolean onePhaseCommit, @@ -515,7 +515,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { return new GridFinishedFuture<>(cctx.kernalContext(), e); } - final GridCacheReturn<Object> ret = new GridCacheReturn<>(localResult(), false); + final GridCacheReturn ret = new GridCacheReturn(localResult(), false); if (F.isEmpty(entries)) return new GridFinishedFuture<>(cctx.kernalContext(), ret); @@ -606,9 +606,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param filter Entry write filter. * @return Future for lock acquisition. */ - private IgniteInternalFuture<GridCacheReturn<Object>> obtainLockAsync( + private IgniteInternalFuture<GridCacheReturn> obtainLockAsync( final GridCacheContext cacheCtx, - GridCacheReturn<Object> ret, + GridCacheReturn ret, final Collection<KeyCacheObject> passedKeys, final boolean read, final Set<KeyCacheObject> skipped, @@ -635,8 +635,8 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { return new GridEmbeddedFuture<>( fut, - new PLC1<GridCacheReturn<Object>>(ret) { - @Override protected GridCacheReturn<Object> postLock(GridCacheReturn<Object> ret) throws IgniteCheckedException { + new PLC1<GridCacheReturn>(ret) { + @Override protected GridCacheReturn postLock(GridCacheReturn ret) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Acquired transaction lock on keys: " + passedKeys); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index bb12cd8..dd14954 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -113,7 +113,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu private boolean retVal; /** Return value. */ - private GridCacheReturn<Object> ret; + private GridCacheReturn ret; /** Keys that did not pass the filter. */ private Collection<IgniteTxKey> filterFailedKeys; @@ -272,7 +272,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu * */ private void onEntriesLocked() { - ret = new GridCacheReturn<>(null, tx.localResult(), null, true); + ret = new GridCacheReturn(null, tx.localResult(), null, true); for (IgniteTxEntry txEntry : tx.optimisticLockEntries()) { GridCacheContext cacheCtx = txEntry.context(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index a502a87..a5f319e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -415,23 +415,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) { A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); - return putxAsync(key, newVal, ctx.equalsValArray(oldVal)); } /** {@inheritDoc} */ - @Override public GridCacheReturn<V> removex(K key, V val) throws IgniteCheckedException { + @Override public GridCacheReturn removex(K key, V val) throws IgniteCheckedException { return removexAsync(key, val).get(); } /** {@inheritDoc} */ - @Override public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { + @Override public GridCacheReturn replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { return replacexAsync(key, oldVal, newVal).get(); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { + @Override public IgniteInternalFuture<GridCacheReturn> removexAsync(K key, V val) { A.notNull(key, "key", val, "val"); return removeAllAsync0(F.asList(key), null, true, true, ctx.equalsValArray(val)); @@ -439,7 +438,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { + @Override public IgniteInternalFuture<GridCacheReturn> replacexAsync(K key, V oldVal, V newVal) { return updateAllAsync0(F.asMap(key, newVal), null, null, @@ -943,7 +942,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { success = false; } else - ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, false); + ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true); } else success = false; @@ -1104,7 +1103,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { expiry = expiryPolicy(req.expiry()); - GridCacheReturn<Object> retVal = null; + GridCacheReturn retVal = null; if (keys.size() > 1 && // Several keys ... writeThrough() && // and store is enabled ... @@ -1149,7 +1148,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (retVal == null) - retVal = new GridCacheReturn<>(ctx, node.isLocal(), null, true); + retVal = new GridCacheReturn(ctx, node.isLocal(), null, true); res.returnValue(retVal); } @@ -1641,7 +1640,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { String taskName, @Nullable IgniteCacheExpiryPolicy expiry ) throws GridCacheEntryRemovedException { - GridCacheReturn<Object> retVal = null; + GridCacheReturn retVal = null; Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; List<KeyCacheObject> keys = req.keys(); @@ -1799,7 +1798,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) { if (retVal == null) - retVal = new GridCacheReturn<>(node.isLocal()); + retVal = new GridCacheReturn(node.isLocal()); retVal.addEntryProcessResult(ctx, k, @@ -1813,7 +1812,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (retVal == null) { CacheObject ret = updRes.oldValue(); - retVal = new GridCacheReturn<>(ctx, + retVal = new GridCacheReturn(ctx, node.isLocal(), req.returnValue() ? ret : null, updRes.success()); @@ -2648,7 +2647,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ private static class UpdateSingleResult<K, V> { /** */ - private final GridCacheReturn<Object> retVal; + private final GridCacheReturn retVal; /** */ private final Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted; @@ -2661,7 +2660,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param deleted Deleted entries. * @param dhtFut DHT future. */ - private UpdateSingleResult(GridCacheReturn<Object> retVal, + private UpdateSingleResult(GridCacheReturn retVal, Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted, GridDhtAtomicUpdateFuture dhtFut) { this.retVal = retVal; @@ -2672,7 +2671,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * @return Return value. */ - private GridCacheReturn<Object> returnValue() { + private GridCacheReturn returnValue() { return retVal; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index b9b4d39..3b61388 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -100,7 +100,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem private volatile CachePartialUpdateCheckedException err; /** Operation result. */ - private volatile GridCacheReturn<?> opRes; + private volatile GridCacheReturn opRes; /** Return value require flag. */ private final boolean retval; @@ -377,7 +377,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem onDone(opRes); } else { - GridCacheReturn<?> opRes0 = opRes = ret; + GridCacheReturn opRes0 = opRes = ret; onDone(opRes0); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index aa8955d..01d5722 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -153,7 +153,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** * @return Return value. */ - public GridCacheReturn<?> returnValue() { + public GridCacheReturn returnValue() { return ret; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 7ec6fe2..e1b8963 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -765,7 +765,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (log.isDebugEnabled()) log.debug("Performing colocated lock [tx=" + tx + ", keys=" + keys + ']'); - IgniteInternalFuture<GridCacheReturn<Object>> txFut = tx.lockAllAsync(cacheCtx, + IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync(cacheCtx, keys, tx.implicit(), txRead, @@ -774,8 +774,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte return new GridDhtEmbeddedFuture<>( ctx.kernalContext(), txFut, - new C2<GridCacheReturn<Object>, Exception, Exception>() { - @Override public Exception apply(GridCacheReturn<Object> ret, + new C2<GridCacheReturn, Exception, Exception>() { + @Override public Exception apply(GridCacheReturn ret, Exception e) { if (e != null) e = U.unwrap(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 3c1b2fa..0f036c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -106,7 +106,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity new AtomicReference<>(); /** Map of current values. */ - private Map<KeyCacheObject, GridTuple3<GridCacheVersion, CacheObject, byte[]>> valMap; + private Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, CacheObject>> valMap; /** Trackable flag (here may be non-volatile). */ private boolean trackable; @@ -590,78 +590,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } /** - * Gets next near lock mapping and either acquires dht locks locally or sends near lock request to - * remote primary node. - * - * @param mappings Queue of mappings. - * @throws IgniteCheckedException If mapping can not be completed. - */ - private void proceedMapping(final Deque<GridNearLockMapping> mappings) - throws IgniteCheckedException { - GridNearLockMapping map = mappings.poll(); - - // If there are no more mappings to process, complete the future. - if (map == null) - return; - - final GridNearLockRequest req = map.request(); - final Collection<KeyCacheObject> mappedKeys = map.distributedKeys(); - final ClusterNode node = map.node(); - - if (filter != null && filter.length != 0) - req.filter(filter, cctx); - - if (node.isLocal()) - lockLocally(mappedKeys, req.topologyVersion(), mappings); - else { - final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings); - - req.miniId(fut.futureId()); - - add(fut); // Append new future. - - IgniteInternalFuture<?> txSync = null; - - if (inTx()) - txSync = cctx.tm().awaitFinishAckAsync(node.id(), tx.threadId()); - - if (txSync == null || txSync.isDone()) { - try { - if (log.isDebugEnabled()) - log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - - cctx.io().send(node, req, cctx.ioPolicy()); - } - catch (ClusterTopologyCheckedException ex) { - assert fut != null; - - fut.onResult(ex); - } - } - else { - txSync.listenAsync(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> t) { - try { - if (log.isDebugEnabled()) - log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - - cctx.io().send(node, req, cctx.ioPolicy()); - } - catch (ClusterTopologyCheckedException ex) { - assert fut != null; - - fut.onResult(ex); - } - catch (IgniteCheckedException e) { - onError(e); - } - } - }); - } - } - } - - /** * Maps keys to nodes. Note that we can not simply group keys by nodes and send lock request as * such approach does not preserve order of lock acquisition. Instead, keys are split in continuous * groups belonging to one primary node and locks for these groups are acquired sequentially. @@ -759,7 +687,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity GridCacheMvccCandidate cand = addEntry(entry); // Will either return value from dht cache or null if this is a miss. - GridTuple3<GridCacheVersion, CacheObject, byte[]> val = entry.detached() ? null : + IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.detached() ? null : ((GridDhtCacheEntry)entry).versionedValue(topVer); GridCacheVersion dhtVer = null; @@ -863,6 +791,78 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } /** + * Gets next near lock mapping and either acquires dht locks locally or sends near lock request to + * remote primary node. + * + * @param mappings Queue of mappings. + * @throws IgniteCheckedException If mapping can not be completed. + */ + private void proceedMapping(final Deque<GridNearLockMapping> mappings) + throws IgniteCheckedException { + GridNearLockMapping map = mappings.poll(); + + // If there are no more mappings to process, complete the future. + if (map == null) + return; + + final GridNearLockRequest req = map.request(); + final Collection<KeyCacheObject> mappedKeys = map.distributedKeys(); + final ClusterNode node = map.node(); + + if (filter != null && filter.length != 0) + req.filter(filter, cctx); + + if (node.isLocal()) + lockLocally(mappedKeys, req.topologyVersion(), mappings); + else { + final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings); + + req.miniId(fut.futureId()); + + add(fut); // Append new future. + + IgniteInternalFuture<?> txSync = null; + + if (inTx()) + txSync = cctx.tm().awaitFinishAckAsync(node.id(), tx.threadId()); + + if (txSync == null || txSync.isDone()) { + try { + if (log.isDebugEnabled()) + log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); + + cctx.io().send(node, req, cctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException ex) { + assert fut != null; + + fut.onResult(ex); + } + } + else { + txSync.listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + try { + if (log.isDebugEnabled()) + log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); + + cctx.io().send(node, req, cctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException ex) { + assert fut != null; + + fut.onResult(ex); + } + catch (IgniteCheckedException e) { + onError(e); + } + } + }); + } + } + } + + /** * Locks given keys directly through dht cache. * * @param keys Collection of keys. @@ -1208,7 +1208,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity int i = 0; for (KeyCacheObject k : keys) { - GridTuple3<GridCacheVersion, CacheObject, byte[]> oldValTup = valMap.get(k); + IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k); CacheObject newVal = res.value(i); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 1338788..db56ebb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -59,7 +59,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec private GridCacheContext<K, V> cctx; /** Topology. */ - private GridDhtPartitionTopology<K, V> top; + private GridDhtPartitionTopology top; /** Logger. */ private IgniteLogger log; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 4e5beed..1589b77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -92,7 +92,7 @@ public class GridDhtPartitionDemandPool<K, V> { private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>(); /** Last exchange future. */ - private volatile GridDhtPartitionsExchangeFuture<K, V> lastExchangeFut; + private volatile GridDhtPartitionsExchangeFuture lastExchangeFut; /** * @param cctx Cache context. @@ -202,7 +202,7 @@ public class GridDhtPartitionDemandPool<K, V> { if (obj != null) cctx.time().removeTimeoutObject(obj); - final GridDhtPartitionsExchangeFuture<K, V> exchFut = lastExchangeFut; + final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; if (exchFut != null) { if (log.isDebugEnabled()) @@ -351,7 +351,7 @@ public class GridDhtPartitionDemandPool<K, V> { if (obj != null) cctx.time().removeTimeoutObject(obj); - final GridDhtPartitionsExchangeFuture<K, V> exchFut = lastExchangeFut; + final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; assert exchFut != null : "Delaying preload process without topology event."; @@ -569,8 +569,8 @@ public class GridDhtPartitionDemandPool<K, V> { * @throws IgniteCheckedException If failed to send message. */ private Set<Integer> demandFromNode(ClusterNode node, final long topVer, GridDhtPartitionDemandMessage d, - GridDhtPartitionsExchangeFuture<K, V> exchFut) throws InterruptedException, IgniteCheckedException { - GridDhtPartitionTopology<K, V> top = cctx.dht().topology(); + GridDhtPartitionsExchangeFuture exchFut) throws InterruptedException, IgniteCheckedException { + GridDhtPartitionTopology top = cctx.dht().topology(); cntr++; @@ -834,7 +834,7 @@ public class GridDhtPartitionDemandPool<K, V> { } } - GridDhtPartitionsExchangeFuture<K, V> exchFut = null; + GridDhtPartitionsExchangeFuture exchFut = null; boolean stopEvtFired = false; @@ -959,7 +959,7 @@ public class GridDhtPartitionDemandPool<K, V> { * * @param lastFut Last future to set. */ - void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture<K, V> lastFut) { + void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { lastExchangeFut = lastFut; } @@ -967,9 +967,9 @@ public class GridDhtPartitionDemandPool<K, V> { * @param exchFut Exchange future. * @return Assignments of partitions to nodes. */ - GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture<K, V> exchFut) { + GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) { // No assignments for disabled preloader. - GridDhtPartitionTopology<K, V> top = cctx.dht().topology(); + GridDhtPartitionTopology top = cctx.dht().topology(); if (!cctx.preloadEnabled()) return new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index 24f7c94..7b1786d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@ -54,7 +54,7 @@ class GridDhtPartitionSupplyPool<K, V> { private final ReadWriteLock busyLock; /** */ - private GridDhtPartitionTopology<K, V> top; + private GridDhtPartitionTopology top; /** */ private final Collection<SupplyWorker> workers = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 1918f61..1cfd3a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -47,8 +47,8 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; /** * Future for exchanging partition maps. */ -public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Long> - implements Comparable<GridDhtPartitionsExchangeFuture<K, V>>, GridDhtTopologyFuture { +public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<Long> + implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture { /** */ private static final long serialVersionUID = 0L; @@ -99,7 +99,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon private volatile GridTimeoutObject timeoutObj; /** Cache context. */ - private final GridCacheSharedContext<K, V> cctx; + private final GridCacheSharedContext<?, ?> cctx; /** Busy lock to prevent activities from accessing exchanger while it's stopping. */ private ReadWriteLock busyLock; @@ -150,7 +150,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon * @param discoEvt Discovery event. * @param exchId Exchange id. */ - public GridDhtPartitionsExchangeFuture(GridCacheSharedContext<K, V> cctx, boolean reassign, DiscoveryEvent discoEvt, + public GridDhtPartitionsExchangeFuture(GridCacheSharedContext cctx, boolean reassign, DiscoveryEvent discoEvt, GridDhtPartitionExchangeId exchId) { super(cctx.kernalContext()); dummy = true; @@ -174,7 +174,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon * @param discoEvt Discovery event. * @param exchId Exchange id. */ - public GridDhtPartitionsExchangeFuture(GridCacheSharedContext<K, V> cctx, DiscoveryEvent discoEvt, + public GridDhtPartitionsExchangeFuture(GridCacheSharedContext cctx, DiscoveryEvent discoEvt, GridDhtPartitionExchangeId exchId) { super(cctx.kernalContext()); dummy = false; @@ -196,7 +196,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon * @param busyLock Busy lock. * @param exchId Exchange ID. */ - public GridDhtPartitionsExchangeFuture(GridCacheSharedContext<K, V> cctx, ReadWriteLock busyLock, + public GridDhtPartitionsExchangeFuture(GridCacheSharedContext cctx, ReadWriteLock busyLock, GridDhtPartitionExchangeId exchId) { super(cctx.kernalContext()); @@ -283,7 +283,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon /** * Rechecks topology. */ - private void initTopology(GridCacheContext<K, V> cacheCtx) throws IgniteCheckedException { + private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException { if (canCalculateAffinity(cacheCtx)) { if (log.isDebugEnabled()) log.debug("Will recalculate affinity [locNodeId=" + cctx.localNodeId() + ", exchId=" + exchId + ']'); @@ -296,8 +296,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon exchId + ']'); // Fetch affinity assignment from remote node. - GridDhtAssignmentFetchFuture<K, V> fetchFut = - new GridDhtAssignmentFetchFuture<>(cacheCtx, exchId.topologyVersion(), CU.affinityNodes(cacheCtx)); + GridDhtAssignmentFetchFuture fetchFut = + new GridDhtAssignmentFetchFuture(cacheCtx, exchId.topologyVersion(), CU.affinityNodes(cacheCtx)); fetchFut.init(); @@ -314,7 +314,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon /** * @return {@code True} if local node can calculate affinity on it's own for this partition map exchange. */ - private boolean canCalculateAffinity(GridCacheContext<K, V> cacheCtx) { + private boolean canCalculateAffinity(GridCacheContext cacheCtx) { CacheAffinityFunction affFunc = cacheCtx.config().getAffinity(); // Do not request affinity from remote nodes if affinity function is not centralized. @@ -425,7 +425,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon assert exchId.nodeId().equals(discoEvt.eventNode().id()); - for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { // Update before waiting for locks. if (!cacheCtx.isLocal()) cacheCtx.topology().updateTopologyVersion(exchId, this); @@ -446,7 +446,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon long topVer = exchId.topologyVersion(); - for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@ -469,7 +469,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon if (log.isDebugEnabled()) log.debug("After waiting for partition release future: " + this); - for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@ -483,7 +483,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon // Process queued undeploys prior to sending/spreading map. cacheCtx.preloader().unwindUndeploys(); - GridDhtPartitionTopology<K, V> top = cacheCtx.topology(); + GridDhtPartitionTopology top = cacheCtx.topology(); assert topVer == top.topologyVersion() : "Topology version is updated only in this class instances inside single ExchangeWorker thread."; @@ -491,7 +491,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon top.beforeExchange(exchId); } - for (GridClientPartitionTopology<K, V> top : cctx.exchange().clientTopologies()) { + for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { top.updateTopologyVersion(exchId, this); top.beforeExchange(exchId); @@ -553,7 +553,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException { GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last()); - for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) m.addLocalPartitionMap(cacheCtx.cacheId(), cacheCtx.topology().localPartitionMap()); } @@ -575,12 +575,12 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon lastVer.get(), id.topologyVersion()); - for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); } - for (GridClientPartitionTopology<K, V> top : cctx.exchange().clientTopologies()) + for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true)); if (log.isDebugEnabled()) @@ -635,7 +635,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon /** {@inheritDoc} */ @Override public boolean onDone(Long res, Throwable err) { if (err == null) { - for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) cacheCtx.affinity().cleanUpCache(res - 10); } @@ -655,7 +655,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon if (timeoutObj != null) cctx.kernalContext().timeout().removeTimeoutObject(timeoutObj); - for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (exchId.event() == EventType.EVT_NODE_FAILED || exchId.event() == EventType.EVT_NODE_LEFT) cacheCtx.config().getAffinity().removeNode(exchId.nodeId()); } @@ -852,7 +852,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { Integer cacheId = entry.getKey(); - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); if (cacheCtx != null) cacheCtx.topology().update(exchId, entry.getValue()); @@ -869,9 +869,9 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon private void updatePartitionSingleMap(GridDhtPartitionsSingleMessage msg) { for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { Integer cacheId = entry.getKey(); - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - GridDhtPartitionTopology<K, V> top = cacheCtx != null ? cacheCtx.topology() : + GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : cctx.exchange().clientTopology(cacheId, exchId); top.update(exchId, entry.getValue()); @@ -925,7 +925,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon // If local node is just joining. if (exchId.nodeId().equals(cctx.localNodeId())) { try { - for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) cacheCtx.topology().beforeExchange(exchId); } @@ -1078,7 +1078,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon } /** {@inheritDoc} */ - @Override public int compareTo(GridDhtPartitionsExchangeFuture<K, V> fut) { + @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) { return exchId.compareTo(fut.exchId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index eb9c9b8..c1d94bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -49,7 +49,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500; /** */ - private GridDhtPartitionTopology<K, V> top; + private GridDhtPartitionTopology top; /** Topology version. */ private final GridAtomicLong topVer = new GridAtomicLong(); @@ -70,7 +70,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); /** Pending affinity assignment futures. */ - private ConcurrentMap<Long, GridDhtAssignmentFetchFuture<K, V>> pendingAssignmentFetchFuts = + private ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts = new ConcurrentHashMap8<>(); /** Discovery listener. */ @@ -102,7 +102,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { e.topologyVersion() + ", curVer=" + topVer.get() + ']'; if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) { - for (GridDhtAssignmentFetchFuture<K, V> fut : pendingAssignmentFetchFuts.values()) + for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) fut.onNodeLeft(e.eventNode().id()); } } @@ -246,12 +246,12 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture<K, V> lastFut) { + @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { demandPool.updateLastExchangeFuture(lastFut); } /** {@inheritDoc} */ - @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture<K, V> exchFut) { + @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) { return demandPool.assign(exchFut); } @@ -276,8 +276,8 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * @param topVer Requested topology version. * @param fut Future to add. */ - public void addDhtAssignmentFetchFuture(long topVer, GridDhtAssignmentFetchFuture<K, V> fut) { - GridDhtAssignmentFetchFuture<K, V> old = pendingAssignmentFetchFuts.putIfAbsent(topVer, fut); + public void addDhtAssignmentFetchFuture(long topVer, GridDhtAssignmentFetchFuture fut) { + GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(topVer, fut); assert old == null : "More than one thread is trying to fetch partition assignments: " + topVer; } @@ -286,7 +286,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * @param topVer Requested topology version. * @param fut Future to remove. */ - public void removeDhtAssignmentFetchFuture(long topVer, GridDhtAssignmentFetchFuture<K, V> fut) { + public void removeDhtAssignmentFetchFuture(long topVer, GridDhtAssignmentFetchFuture fut) { boolean rmv = pendingAssignmentFetchFuts.remove(topVer, fut); assert rmv : "Failed to remove assignment fetch future: " + topVer; @@ -455,7 +455,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Processing affinity assignment response [node=" + node + ", res=" + res + ']'); - for (GridDhtAssignmentFetchFuture<K, V> fut : pendingAssignmentFetchFuts.values()) + for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) fut.onResponse(node, res); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index 823d186..dda2115 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@ -33,7 +33,7 @@ public class GridDhtPreloaderAssignments<K, V> extends /** Exchange future. */ @GridToStringExclude - private final GridDhtPartitionsExchangeFuture<K, V> exchFut; + private final GridDhtPartitionsExchangeFuture exchFut; /** Last join order. */ private final long topVer; @@ -42,7 +42,7 @@ public class GridDhtPreloaderAssignments<K, V> extends * @param exchFut Exchange future. * @param topVer Last join order. */ - public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture<K, V> exchFut, long topVer) { + public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture exchFut, long topVer) { assert exchFut != null; assert topVer > 0; @@ -53,7 +53,7 @@ public class GridDhtPreloaderAssignments<K, V> extends /** * @return Exchange future. */ - GridDhtPartitionsExchangeFuture<K, V> exchangeFuture() { + GridDhtPartitionsExchangeFuture exchangeFuture() { return exchFut; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index a5050e6..d27f516 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -483,24 +483,24 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public GridCacheReturn<V> removex(K key, V val) throws IgniteCheckedException { + @Override public GridCacheReturn removex(K key, V val) throws IgniteCheckedException { return dht.removex(key, val); } /** {@inheritDoc} */ - @Override public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { + @Override public GridCacheReturn replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { return dht.replacex(key, oldVal, newVal); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { + @Override public IgniteInternalFuture<GridCacheReturn> removexAsync(K key, V val) { return dht.removexAsync(key, val); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { + @Override public IgniteInternalFuture<GridCacheReturn> replacexAsync(K key, V oldVal, V newVal) { return dht.replacexAsync(key, oldVal, newVal); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index b5cbe69..e1983d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -266,7 +266,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { * @return Tuple with version and value of this entry. * @throws GridCacheEntryRemovedException If entry has been removed. */ - @Nullable public synchronized GridTuple3<GridCacheVersion, CacheObject, byte[]> versionedValue() + @Nullable public synchronized IgniteBiTuple<GridCacheVersion, CacheObject> versionedValue() throws GridCacheEntryRemovedException { checkObsolete(); @@ -275,7 +275,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { else { CacheObject val0 = valueBytesUnlocked(); - return F.t(ver, val0, null); + return F.t(ver, val0); } } @@ -315,14 +315,13 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { taskName, true, null, - false).get().get(key.value(cctx.cacheObjectContext(), false)); + false).get().get(keyValue(false)); } /** * @param tx Transaction. * @param primaryNodeId Primary node ID. * @param val New value. - * @param valBytes Value bytes. * @param ver Version to use. * @param dhtVer DHT version received from remote node. * @param expVer Optional version to match. @@ -339,7 +338,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { public boolean loadedValue(@Nullable IgniteInternalTx tx, UUID primaryNodeId, CacheObject val, - byte[] valBytes, GridCacheVersion ver, GridCacheVersion dhtVer, @Nullable GridCacheVersion expVer, @@ -373,7 +371,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { update(val, expireTime, ttl, ver); if (cctx.deferredDelete()) { - boolean deleted = val == null && valBytes == null; + boolean deleted = val == null; if (deleted != deletedUnlocked()) { deletedUnlocked(deleted); @@ -391,7 +389,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) cctx.events().addEvent(partition(), key, tx, null, EVT_CACHE_OBJECT_READ, - val, val != null || valBytes != null, old, hasVal, subjId, null, null); + val, val != null, old, hasVal, subjId, null, null); return ret; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index e1d35ed..c2a00e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -490,7 +490,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma } if (v != null && !reload) { - K key0 = key.value(cctx.cacheObjectContext(), false); + K key0 = key.value(cctx.cacheObjectContext(), true); V val0 = v.value(cctx.cacheObjectContext(), true); val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable); @@ -612,7 +612,6 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma entry.loadedValue(tx, nodeId, info.value(), - null, atomic ? info.version() : ver, info.version(), saved, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 3895924..0ffc465 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -108,7 +108,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B new AtomicReference<>(); /** Map of current values. */ - private Map<KeyCacheObject, GridTuple3<GridCacheVersion, CacheObject, byte[]>> valMap; + private Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, CacheObject>> valMap; /** Trackable flag. */ private boolean trackable = true; @@ -819,7 +819,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (tx == null && !cand.reentry()) cctx.mvcc().addExplicitLock(threadId, cand, snapshot); - GridTuple3<GridCacheVersion, CacheObject, byte[]> val = entry.versionedValue(); + IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.versionedValue(); if (val == null) { GridDhtCacheEntry dhtEntry = dht().peekExx(key); @@ -1003,7 +1003,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B GridNearCacheEntry entry = cctx.near().entryExx(k, req.topologyVersion()); try { - GridTuple3<GridCacheVersion, CacheObject, byte[]> oldValTup = + IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key()); boolean hasBytes = entry.hasValue(); @@ -1356,7 +1356,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B return; } - GridTuple3<GridCacheVersion, CacheObject, byte[]> oldValTup = valMap.get(entry.key()); + IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key()); CacheObject oldVal = entry.rawGet(); boolean hasOldVal = false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 2da3f9d..6669773 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -1066,7 +1066,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** {@inheritDoc} */ - public <K> IgniteInternalFuture<GridCacheReturn<Object>> lockAllAsync(GridCacheContext cacheCtx, + public <K> IgniteInternalFuture<GridCacheReturn> lockAllAsync(GridCacheContext cacheCtx, final Collection<? extends K> keys, boolean implicit, boolean read, @@ -1080,7 +1080,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { return new GridFinishedFuture<>(cctx.kernalContext(), e); } - final GridCacheReturn<Object> ret = new GridCacheReturn<>(localResult(), false); + final GridCacheReturn ret = new GridCacheReturn(localResult(), false); if (F.isEmpty(keys)) return new GridFinishedFuture<>(cctx.kernalContext(), ret); @@ -1102,8 +1102,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { return new GridEmbeddedFuture<>( fut, - new PLC1<GridCacheReturn<Object>>(ret, false) { - @Override protected GridCacheReturn<Object> postLock(GridCacheReturn<Object> ret) { + new PLC1<GridCacheReturn>(ret, false) { + @Override protected GridCacheReturn postLock(GridCacheReturn ret) { if (log.isDebugEnabled()) log.debug("Acquired transaction lock on keys: " + keys); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index 31b5f4a..2456674 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -74,7 +74,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse private Collection<CacheVersionedValue> ownedValVals; /** Cache return value. */ - private GridCacheReturn<Object> retVal; + private GridCacheReturn retVal; /** Filter failed keys. */ @GridDirectCollection(IgniteTxKey.class) @@ -102,7 +102,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse IgniteUuid miniId, GridCacheVersion dhtVer, Collection<Integer> invalidParts, - GridCacheReturn<Object> retVal, + GridCacheReturn retVal, Throwable err ) { super(xid, err); @@ -188,7 +188,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse /** * @return Return value. */ - public GridCacheReturn<Object> returnValue() { + public GridCacheReturn returnValue() { return retVal; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 11571d0..8fb8ab4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -260,12 +260,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { + @Override public GridCacheReturn replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); ctx.denyOnLocalRead(); - return (GridCacheReturn<V>)updateAllInternal(UPDATE, + return (GridCacheReturn)updateAllInternal(UPDATE, Collections.singleton(key), Collections.singleton(newVal), null, @@ -278,12 +278,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public GridCacheReturn<V> removex(K key, V val) throws IgniteCheckedException { + @Override public GridCacheReturn removex(K key, V val) throws IgniteCheckedException { A.notNull(key, "key", val, "val"); ctx.denyOnLocalRead(); - return (GridCacheReturn<V>)updateAllInternal(DELETE, + return (GridCacheReturn)updateAllInternal(DELETE, Collections.singleton(key), null, null, @@ -296,7 +296,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { + @Override public IgniteInternalFuture<GridCacheReturn> removexAsync(K key, V val) { A.notNull(key, "key", val, "val"); ctx.denyOnLocalRead(); @@ -306,7 +306,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { + @Override public IgniteInternalFuture<GridCacheReturn> replacexAsync(K key, V oldVal, V newVal) { A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); ctx.denyOnLocalRead(); @@ -1034,7 +1034,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (err != null) throw err; - Object ret = res == null ? null : rawRetval ? new GridCacheReturn<>(ctx, true, res.get2(), res.get1()) : + Object ret = res == null ? null : rawRetval ? new GridCacheReturn(ctx, true, res.get2(), res.get1()) : (retval || op == TRANSFORM) ? res.get2() : res.get1(); if (op == TRANSFORM && ret == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ae9ec6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 53a2cdb..ebe691a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -290,7 +290,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { * @param ctx Kernal context. * @return Continuous query manager. */ - private CacheContinuousQueryManager<K, V> manager(GridKernalContext ctx) { + private CacheContinuousQueryManager manager(GridKernalContext ctx) { return cacheContext(ctx).continuousQueries(); }