http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 88dc193..c9681c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -258,7 +258,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param topVer Topology version. * @return Finish future. */ - @Nullable public IgniteFuture<?> multiUpdateFinishFuture(long topVer) { + @Nullable public IgniteInternalFuture<?> multiUpdateFinishFuture(long topVer) { GridCompoundFuture<IgniteUuid, Object> fut = null; for (MultiUpdateFuture multiFut : multiTxFuts.values()) { @@ -487,7 +487,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param filter {@inheritDoc} * @return {@inheritDoc} */ - @Override public IgniteFuture<Map<K, V>> getAllAsync( + @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( @Nullable Collection<? extends K> keys, boolean forcePrimary, boolean skipTx, @@ -530,7 +530,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param expiry Expiry policy. * @return Get future. */ - IgniteFuture<Map<K, V>> getDhtAllAsync(@Nullable Collection<? extends K> keys, + IgniteInternalFuture<Map<K, V>> getDhtAllAsync(@Nullable Collection<? extends K> keys, boolean readThrough, @Nullable UUID subjId, String taskName, @@ -605,7 +605,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap final GetExpiryPolicy expiryPlc = ttl == -1L ? null : new GetExpiryPolicy(ttl); - IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut = + IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>> fut = getDhtAsync(nodeId, req.messageId(), req.keys(), @@ -618,8 +618,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap req.filter(), expiryPlc); - fut.listenAsync(new CI1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>>() { - @Override public void apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> f) { + fut.listenAsync(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>>>() { + @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>> f) { GridNearGetResponse<K, V> res = new GridNearGetResponse<>(ctx.cacheId(), req.futureId(), req.miniId(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 52be7a5..550a693 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -355,7 +356,7 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { * @throws GridCacheEntryRemovedException If entry was removed. */ @SuppressWarnings("unchecked") - @Nullable public IgniteFuture<Boolean> addReader(UUID nodeId, long msgId, long topVer) + @Nullable public IgniteInternalFuture<Boolean> addReader(UUID nodeId, long msgId, long topVer) throws GridCacheEntryRemovedException { // Don't add local node as reader. if (cctx.nodeId().equals(nodeId)) @@ -450,8 +451,8 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { if (!txFut.isDone()) { final ReaderId<K, V> reader0 = reader; - txFut.listenAsync(new CI1<IgniteFuture<?>>() { - @Override public void apply(IgniteFuture<?> f) { + txFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { synchronized (this) { // Release memory. reader0.resetTxFuture(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java index 15ca4a0..326a813 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java @@ -49,7 +49,7 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem * @param embedded Embedded. * @param c Closure. */ - public GridDhtEmbeddedFuture(GridKernalContext ctx, IgniteFuture<B> embedded, IgniteBiClosure<B, Exception, A> c) { + public GridDhtEmbeddedFuture(GridKernalContext ctx, IgniteInternalFuture<B> embedded, IgniteBiClosure<B, Exception, A> c) { super(ctx, embedded, c); invalidParts = Collections.emptyList(); @@ -60,8 +60,8 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem * @param c Embedding closure. * @param ctx Kernal context. */ - public GridDhtEmbeddedFuture(IgniteFuture<B> embedded, - IgniteBiClosure<B, Exception, IgniteFuture<A>> c, GridKernalContext ctx) { + public GridDhtEmbeddedFuture(IgniteInternalFuture<B> embedded, + IgniteBiClosure<B, Exception, IgniteInternalFuture<A>> c, GridKernalContext ctx) { super(embedded, c, ctx); invalidParts = Collections.emptyList(); @@ -73,7 +73,7 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem * @param c Closure. * @param invalidParts Retries. */ - public GridDhtEmbeddedFuture(GridKernalContext ctx, IgniteFuture<B> embedded, IgniteBiClosure<B, Exception, A> c, + public GridDhtEmbeddedFuture(GridKernalContext ctx, IgniteInternalFuture<B> embedded, IgniteBiClosure<B, Exception, A> c, Collection<Integer> invalidParts) { super(ctx, embedded, c); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtFuture.java index af494d5..9044bfb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtFuture.java @@ -17,14 +17,14 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import org.apache.ignite.lang.*; +import org.apache.ignite.internal.*; import java.util.*; /** * Keys to retry. */ -public interface GridDhtFuture<T> extends IgniteFuture<T> { +public interface GridDhtFuture<T> extends IgniteInternalFuture<T> { /** * Node that future should be able to provide keys to retry before * it completes, so it's not necessary to wait till future is done http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 251d550..b6282c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; @@ -286,7 +287,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * @return Future for local get. */ @SuppressWarnings( {"unchecked", "IfMayBeConditional"}) - private IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> getAsync(final LinkedHashMap<? extends K, Boolean> keys) { + private IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>> getAsync(final LinkedHashMap<? extends K, Boolean> keys) { if (F.isEmpty(keys)) return new GridFinishedFuture<Collection<GridCacheEntryInfo<K, V>>>(cctx.kernalContext(), Collections.<GridCacheEntryInfo<K, V>>emptyList()); @@ -321,7 +322,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col // TODO: To fix, check that reader is contained in the list of readers once // TODO: again after the returned future completes - if not, try again. // TODO: Also, why is info read before transactions are complete, and not after? - IgniteFuture<Boolean> f = (!e.deleted() && k.getValue()) ? e.addReader(reader, msgId, topVer) : null; + IgniteInternalFuture<Boolean> f = (!e.deleted() && k.getValue()) ? e.addReader(reader, msgId, topVer) : null; if (f != null) { if (txFut == null) @@ -347,7 +348,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col if (txFut != null) txFut.markInitialized(); - IgniteFuture<Map<K, V>> fut; + IgniteInternalFuture<Map<K, V>> fut; if (txFut == null || txFut.isDone()) { if (reload && cctx.readThrough() && cctx.store().configured()) { @@ -382,8 +383,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col // transactions to complete. fut = new GridEmbeddedFuture<>( txFut, - new C2<Boolean, Exception, IgniteFuture<Map<K, V>>>() { - @Override public IgniteFuture<Map<K, V>> apply(Boolean b, Exception e) { + new C2<Boolean, Exception, IgniteInternalFuture<Map<K, V>>>() { + @Override public IgniteInternalFuture<Map<K, V>> apply(Boolean b, Exception e) { if (e != null) throw new GridClosureException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 373b488..a3a0ab7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; @@ -390,7 +391,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti * @param updateSeq Update sequence. * @return Future to signal that this node is no longer an owner or backup. */ - IgniteFuture<?> rent(boolean updateSeq) { + IgniteInternalFuture<?> rent(boolean updateSeq) { while (true) { int reservations = state.getStamp(); @@ -418,7 +419,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti * @param updateSeq Update sequence. * @return Future for evict attempt. */ - private IgniteFuture<Boolean> tryEvictAsync(boolean updateSeq) { + private IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) { if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) { if (log.isDebugEnabled()) log.debug("Evicted partition: " + this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 8174985..f0da6b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -234,8 +235,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @return Participating nodes. */ @Override public Collection<? extends ClusterNode> nodes() { - return F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) { + return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { if (isMini(f)) return ((MiniFuture)f).node(); @@ -470,7 +471,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo @Override public boolean onNodeLeft(UUID nodeId) { boolean found = false; - for (IgniteFuture<?> fut : futures()) { + for (IgniteInternalFuture<?> fut : futures()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -496,7 +497,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo boolean found = false; - for (IgniteFuture<Boolean> fut : pending()) { + for (IgniteInternalFuture<Boolean> fut : pending()) { if (isMini(fut)) { MiniFuture mini = (MiniFuture)fut; @@ -712,7 +713,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param f Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteFuture<?> f) { + private boolean isMini(IgniteInternalFuture<?> f) { return f.getClass().equals(MiniFuture.class); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java index 45eda34..2d2f431 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; -import org.apache.ignite.lang.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.discovery.*; /** @@ -29,7 +29,7 @@ import org.apache.ignite.internal.managers.discovery.*; * When new new transaction is started, it will wait for this future before acquiring new locks on particular * topology version. */ -public interface GridDhtTopologyFuture extends IgniteFuture<Long> { +public interface GridDhtTopologyFuture extends IgniteInternalFuture<Long> { /** * Gets a topology snapshot for the topology version represented by the future. Note that by the time * partition exchange completes some nodes from the snapshot may leave the grid. One should use discovery http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 3d5477b..64c0811 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -343,14 +344,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param req Request. */ protected final void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest<K, V> req) { - IgniteFuture<Object> keyFut = F.isEmpty(req.keys()) ? null : + IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null : ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion()); if (keyFut == null || keyFut.isDone()) processDhtLockRequest0(nodeId, req); else { - keyFut.listenAsync(new CI1<IgniteFuture<Object>>() { - @Override public void apply(IgniteFuture<Object> t) { + keyFut.listenAsync(new CI1<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> t) { processDhtLockRequest0(nodeId, req); } }); @@ -521,7 +522,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } // Group lock can be only started from local node, so we never start group lock transaction on remote node. - IgniteFuture<?> f = lockAllAsync(ctx, nearNode, req, null); + IgniteInternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null); // Register listener just so we print out errors. // Exclude lock timeout exception since it's not a fatal exception. @@ -550,7 +551,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys, + @Override public IgniteInternalFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys, long timeout, IgniteTxLocalEx<K, V> txx, boolean isInvalidate, @@ -665,14 +666,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param filter0 Filter. * @return Future. */ - public IgniteFuture<GridNearLockResponse<K, V>> lockAllAsync( + public IgniteInternalFuture<GridNearLockResponse<K, V>> lockAllAsync( final GridCacheContext<K, V> cacheCtx, final ClusterNode nearNode, final GridNearLockRequest<K, V> req, @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter0) { final List<K> keys = req.keys(); - IgniteFuture<Object> keyFut = null; + IgniteInternalFuture<Object> keyFut = null; if (req.onePhaseCommit()) { boolean forceKeys = req.hasTransforms() || req.filter() != null; @@ -690,8 +691,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach keyFut = new GridFinishedFutureEx<>(); return new GridEmbeddedFuture<>(true, keyFut, - new C2<Object, Exception, IgniteFuture<GridNearLockResponse<K,V>>>() { - @Override public IgniteFuture<GridNearLockResponse<K, V>> apply(Object o, Exception exx) { + new C2<Object, Exception, IgniteInternalFuture<GridNearLockResponse<K,V>>>() { + @Override public IgniteInternalFuture<GridNearLockResponse<K, V>> apply(Object o, Exception exx) { if (exx != null) return new GridDhtFinishedFuture<>(ctx.kernalContext(), exx); @@ -831,7 +832,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach assert req.writeEntries() == null || req.writeEntries().size() == entries.size(); - IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync( + IgniteInternalFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync( cacheCtx, entries, req.writeEntries(), @@ -846,8 +847,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach return new GridDhtEmbeddedFuture<>( txFut, - new C2<GridCacheReturn<V>, Exception, IgniteFuture<GridNearLockResponse<K, V>>>() { - @Override public IgniteFuture<GridNearLockResponse<K, V>> apply( + new C2<GridCacheReturn<V>, Exception, IgniteInternalFuture<GridNearLockResponse<K, V>>>() { + @Override public IgniteInternalFuture<GridNearLockResponse<K, V>> apply( GridCacheReturn<V> o, Exception e) { if (e != null) e = U.unwrap(e); @@ -866,8 +867,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach assert t.implicit(); return t.commitAsync().chain( - new C1<IgniteFuture<IgniteTx>, GridNearLockResponse<K, V>>() { - @Override public GridNearLockResponse<K, V> apply(IgniteFuture<IgniteTx> f) { + new C1<IgniteInternalFuture<IgniteTx>, GridNearLockResponse<K, V>>() { + @Override public GridNearLockResponse<K, V> apply(IgniteInternalFuture<IgniteTx> f) { try { // Check for error. f.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 2672d30..b5c7927 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -122,8 +123,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur */ @Override public Collection<? extends ClusterNode> nodes() { return - F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) { + F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { if (isMini(f)) return ((MiniFuture)f).node(); @@ -134,7 +135,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - for (IgniteFuture<?> fut : futures()) + for (IgniteInternalFuture<?> fut : futures()) if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -199,7 +200,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur */ public void onResult(UUID nodeId, GridDhtTxFinishResponse<K, V> res) { if (!isDone()) { - for (IgniteFuture<IgniteTx> fut : futures()) { + for (IgniteInternalFuture<IgniteTx> fut : futures()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -239,7 +240,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @param f Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteFuture<?> f) { + private boolean isMini(IgniteInternalFuture<?> f) { return f.getClass().equals(MiniFuture.class); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 38bc4ca..4d62ecf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -246,7 +247,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements } /** {@inheritDoc} */ - @Override @Nullable protected IgniteFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, + @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, IgniteTxEntry<K, V> entry, long topVer) { // Don't add local node as reader. if (!cctx.localNodeId().equals(nearNodeId)) { @@ -276,7 +277,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements } /** {@inheritDoc} */ - @Override public IgniteFuture<IgniteTxEx<K, V>> prepareAsync() { + @Override public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsync() { if (optimistic()) { assert isSystemInvalidate(); @@ -348,7 +349,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements * @param lastBackups IDs of backup nodes receiving last prepare request. * @return Future that will be completed when locks are acquired. */ - public IgniteFuture<IgniteTxEx<K, V>> prepareAsync(@Nullable Iterable<IgniteTxEntry<K, V>> reads, + public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsync(@Nullable Iterable<IgniteTxEntry<K, V>> reads, @Nullable Iterable<IgniteTxEntry<K, V>> writes, Map<IgniteTxKey<K>, GridCacheVersion> verMap, long msgId, @@ -449,7 +450,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements /** {@inheritDoc} */ @SuppressWarnings({"ThrowableInstanceNeverThrown"}) - @Override public IgniteFuture<IgniteTx> commitAsync() { + @Override public IgniteInternalFuture<IgniteTx> commitAsync() { if (log.isDebugEnabled()) log.debug("Committing dht local tx: " + this); @@ -486,8 +487,8 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements } } else - prep.listenAsync(new CI1<IgniteFuture<IgniteTxEx<K, V>>>() { - @Override public void apply(IgniteFuture<IgniteTxEx<K, V>> f) { + prep.listenAsync(new CI1<IgniteInternalFuture<IgniteTxEx<K, V>>>() { + @Override public void apply(IgniteInternalFuture<IgniteTxEx<K, V>> f) { try { f.get(); // Check for errors of a parent future. @@ -544,7 +545,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements } /** {@inheritDoc} */ - @Override public IgniteFuture<IgniteTx> rollbackAsync() { + @Override public IgniteInternalFuture<IgniteTx> rollbackAsync() { GridDhtTxPrepareFuture<K, V> prepFut = this.prepFut.get(); final GridDhtTxFinishFuture<K, V> fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false); @@ -574,8 +575,8 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements else { prepFut.complete(); - prepFut.listenAsync(new CI1<IgniteFuture<IgniteTxEx<K, V>>>() { - @Override public void apply(IgniteFuture<IgniteTxEx<K, V>> f) { + prepFut.listenAsync(new CI1<IgniteInternalFuture<IgniteTxEx<K, V>>>() { + @Override public void apply(IgniteInternalFuture<IgniteTxEx<K, V>> f) { try { f.get(); // Check for errors of a parent future. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index d03c8d0..c3e8729 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -138,7 +139,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K * @param topVer Topology version. * @return {@code True} if reader was added as a result of this call. */ - @Nullable protected abstract IgniteFuture<Boolean> addReader(long msgId, + @Nullable protected abstract IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, IgniteTxEntry<K, V> entry, long topVer); @@ -389,7 +390,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K * @return Future for active transactions for the time when reader was added. * @throws IgniteCheckedException If failed. */ - @Nullable public IgniteFuture<Boolean> addEntry(long msgId, IgniteTxEntry<K, V> e) throws IgniteCheckedException { + @Nullable public IgniteInternalFuture<Boolean> addEntry(long msgId, IgniteTxEntry<K, V> e) throws IgniteCheckedException { init(); IgniteTxState state = state(); @@ -481,7 +482,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K * @param accessTtl TTL for read operation. * @return Lock future. */ - IgniteFuture<GridCacheReturn<V>> lockAllAsync( + IgniteInternalFuture<GridCacheReturn<V>> lockAllAsync( GridCacheContext<K, V> cacheCtx, Collection<GridCacheEntryEx<K, V>> entries, List<IgniteTxEntry<K, V>> writeEntries, @@ -609,7 +610,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K * @param filter Entry write filter. * @return Future for lock acquisition. */ - private IgniteFuture<GridCacheReturn<V>> obtainLockAsync( + private IgniteInternalFuture<GridCacheReturn<V>> obtainLockAsync( final GridCacheContext<K, V> cacheCtx, GridCacheReturn<V> ret, final Collection<? extends K> passedKeys, @@ -626,7 +627,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K GridDhtTransactionalCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx(); - IgniteFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys, + IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys, lockTimeout(), this, isInvalidate(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index cd4ab2b..947bec4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -175,8 +176,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu */ @Override public Collection<? extends ClusterNode> nodes() { return - F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) { + F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { if (isMini(f)) return ((MiniFuture)f).node(); @@ -247,7 +248,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - for (IgniteFuture<?> fut : futures()) + for (IgniteInternalFuture<?> fut : futures()) if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -305,7 +306,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu */ public void onResult(UUID nodeId, GridDhtTxPrepareResponse<K, V> res) { if (!isDone()) { - for (IgniteFuture<IgniteTxEx<K, V>> fut : pending()) { + for (IgniteInternalFuture<IgniteTxEx<K, V>> fut : pending()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -516,7 +517,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu * @param f Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteFuture<?> f) { + private boolean isMini(IgniteInternalFuture<?> f) { return f.getClass().equals(MiniFuture.class); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 7227026..5c3d0ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; @@ -202,8 +203,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M @SuppressWarnings("unchecked") @Override public Collection<? extends ClusterNode> nodes() { return - F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<Map<K, V>>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteFuture<Map<K, V>> f) { + F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<Map<K, V>>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture<Map<K, V>> f) { if (isMini(f)) return ((MiniFuture)f).node(); @@ -214,7 +215,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - for (IgniteFuture<Map<K, V>> fut : futures()) + for (IgniteInternalFuture<Map<K, V>> fut : futures()) if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -233,7 +234,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param res Result. */ public void onResult(UUID nodeId, GridNearGetResponse<K, V> res) { - for (IgniteFuture<Map<K, V>> fut : futures()) + for (IgniteInternalFuture<Map<K, V>> fut : futures()) if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -264,7 +265,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param f Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteFuture<Map<K, V>> f) { + private boolean isMini(IgniteInternalFuture<Map<K, V>> f) { return f.getClass().equals(MiniFuture.class); } @@ -357,8 +358,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M } // Add new future. - add(fut.chain(new C1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>, Map<K, V>>() { - @Override public Map<K, V> apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut) { + add(fut.chain(new C1<IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>>, Map<K, V>>() { + @Override public Map<K, V> apply(IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>> fut) { try { return createResultMap(fut.get()); } @@ -693,11 +694,11 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']'); // Need to wait for next topology version to remap. - IgniteFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer); + IgniteInternalFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer); - topFut.listenAsync(new CIX1<IgniteFuture<Long>>() { + topFut.listenAsync(new CIX1<IgniteInternalFuture<Long>>() { @SuppressWarnings("unchecked") - @Override public void applyx(IgniteFuture<Long> fut) throws IgniteCheckedException { + @Override public void applyx(IgniteInternalFuture<Long> fut) throws IgniteCheckedException { long topVer = fut.get(); // This will append new futures to compound list. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d2150de..3bdd976 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; @@ -255,7 +256,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<Map<K, V>> getAllAsync( + @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( @Nullable final Collection<? extends K> keys, final boolean forcePrimary, boolean skipTx, @@ -273,8 +274,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null; - return asyncOp(new CO<IgniteFuture<Map<K, V>>>() { - @Override public IgniteFuture<Map<K, V>> apply() { + return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() { + @Override public IgniteInternalFuture<Map<K, V>> apply() { return getAllAsync0(keys, false, forcePrimary, @@ -307,7 +308,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry, + @Override public IgniteInternalFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry, long ttl, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { A.notNull(key, "key"); @@ -324,7 +325,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry, long ttl, + @Override public IgniteInternalFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry, long ttl, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { A.notNull(key, "key"); @@ -345,7 +346,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<V> putIfAbsentAsync(K key, V val) { + @Override public IgniteInternalFuture<V> putIfAbsentAsync(K key, V val) { A.notNull(key, "key", val, "val"); return putAsync(key, val, ctx.noPeekArray()); @@ -357,7 +358,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> putxIfAbsentAsync(K key, V val) { + @Override public IgniteInternalFuture<Boolean> putxIfAbsentAsync(K key, V val) { A.notNull(key, "key", val, "val"); return putxAsync(key, val, ctx.noPeekArray()); @@ -369,7 +370,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<V> replaceAsync(K key, V val) { + @Override public IgniteInternalFuture<V> replaceAsync(K key, V val) { A.notNull(key, "key", val, "val"); return putAsync(key, val, ctx.hasPeekArray()); @@ -381,7 +382,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> replacexAsync(K key, V val) { + @Override public IgniteInternalFuture<Boolean> replacexAsync(K key, V val) { A.notNull(key, "key", val, "val"); return putxAsync(key, val, ctx.hasPeekArray()); @@ -393,7 +394,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) { + @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) { A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); return putxAsync(key, newVal, ctx.equalsPeekArray(oldVal)); @@ -411,7 +412,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<GridCacheReturn<V>> removexAsync(K key, V val) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { A.notNull(key, "key", val, "val"); return removeAllAsync0(F.asList(key), null, null, true, true, ctx.equalsPeekArray(val)); @@ -419,7 +420,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { return updateAllAsync0(F.asMap(key, newVal), null, null, @@ -438,7 +439,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> putAllAsync(Map<? extends K, ? extends V> m, + @Override public IgniteInternalFuture<?> putAllAsync(Map<? extends K, ? extends V> m, @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) { return updateAllAsync0(m, null, @@ -457,7 +458,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) { + @Override public IgniteInternalFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) { ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); return updateAllAsync0(null, @@ -479,7 +480,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx<K, V> entry, + @Override public IgniteInternalFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx<K, V> entry, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { A.notNull(key, "key"); @@ -493,7 +494,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> removeAllAsync(Collection<? extends K> keys, + @Override public IgniteInternalFuture<?> removeAllAsync(Collection<? extends K> keys, IgnitePredicate<CacheEntry<K, V>>[] filter) { A.notNull(keys, "keys"); @@ -508,7 +509,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx<K, V> entry, + @Override public IgniteInternalFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx<K, V> entry, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { A.notNull(key, "key"); @@ -521,7 +522,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> removeAsync(K key, V val) { + @Override public IgniteInternalFuture<Boolean> removeAsync(K key, V val) { A.notNull(key, "key", val, "val"); return removexAsync(key, ctx.equalsPeekArray(val)); @@ -533,7 +534,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> removeAllAsync(IgnitePredicate<CacheEntry<K, V>>[] filter) { + @Override public IgniteInternalFuture<?> removeAllAsync(IgnitePredicate<CacheEntry<K, V>>[] filter) { return removeAllAsync(keySet(filter), filter); } @@ -543,7 +544,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> removeAllDrAsync(Map<? extends K, GridCacheVersion> drMap) { + @Override public IgniteInternalFuture<?> removeAllDrAsync(Map<? extends K, GridCacheVersion> drMap) { ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); return removeAllAsync0(null, drMap, null, false, false, null); @@ -561,8 +562,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @return Future. */ @SuppressWarnings("unchecked") - protected <T> IgniteFuture<T> asyncOp(final CO<IgniteFuture<T>> op) { - IgniteFuture<T> fail = asyncOpAcquire(); + protected <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) { + IgniteInternalFuture<T> fail = asyncOpAcquire(); if (fail != null) return fail; @@ -572,12 +573,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { holder.lock(); try { - IgniteFuture fut = holder.future(); + IgniteInternalFuture fut = holder.future(); if (fut != null && !fut.isDone()) { - IgniteFuture<T> f = new GridEmbeddedFuture<>(fut, - new C2<T, Exception, IgniteFuture<T>>() { - @Override public IgniteFuture<T> apply(T t, Exception e) { + IgniteInternalFuture<T> f = new GridEmbeddedFuture<>(fut, + new C2<T, Exception, IgniteInternalFuture<T>>() { + @Override public IgniteInternalFuture<T> apply(T t, Exception e) { return op.apply(); } }, ctx.kernalContext()); @@ -587,7 +588,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return f; } - IgniteFuture<T> f = op.apply(); + IgniteInternalFuture<T> f = op.apply(); saveFuture(holder, f); @@ -599,7 +600,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override protected IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, + @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, @Nullable IgniteTxLocalEx<K, V> tx, boolean isInvalidate, @@ -628,7 +629,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key, + @Override public <T> IgniteInternalFuture<EntryProcessorResult<T>> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) { A.notNull(key, "key", entryProcessor, "entryProcessor"); @@ -641,7 +642,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Map<? extends K, EntryProcessor> invokeMap = Collections.singletonMap(key, (EntryProcessor)entryProcessor); - IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null, + IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null, invokeMap, args, null, @@ -651,8 +652,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, null); - return fut.chain(new CX1<IgniteFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { - @Override public EntryProcessorResult<T> applyx(IgniteFuture<Map<K, EntryProcessorResult<T>>> fut) + return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { + @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) throws IgniteCheckedException { Map<K, EntryProcessorResult<T>> resMap = fut.get(); @@ -669,7 +670,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, + @Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, final EntryProcessor<K, V, T> entryProcessor, Object... args) { A.notNull(keys, "keys", entryProcessor, "entryProcessor"); @@ -704,7 +705,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + @Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { A.notNull(map, "map"); @@ -739,7 +740,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param filter Cache entry filter for atomic updates. * @return Completion future. */ - private IgniteFuture updateAllAsync0( + private IgniteInternalFuture updateAllAsync0( @Nullable final Map<? extends K, ? extends V> map, @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap, @Nullable Object[] invokeArgs, @@ -780,8 +781,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { subjId, taskNameHash); - return asyncOp(new CO<IgniteFuture<Object>>() { - @Override public IgniteFuture<Object> apply() { + return asyncOp(new CO<IgniteInternalFuture<Object>>() { + @Override public IgniteInternalFuture<Object> apply() { updateFut.map(); return updateFut; @@ -800,7 +801,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param filter Cache entry filter for atomic removes. * @return Completion future. */ - private IgniteFuture removeAllAsync0( + private IgniteInternalFuture removeAllAsync0( @Nullable final Collection<? extends K> keys, @Nullable final Map<? extends K, GridCacheVersion> drMap, @Nullable GridCacheEntryEx<K, V> cached, @@ -846,8 +847,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (statsEnabled) updateFut.listenAsync(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); - return asyncOp(new CO<IgniteFuture<Object>>() { - @Override public IgniteFuture<Object> apply() { + return asyncOp(new CO<IgniteInternalFuture<Object>>() { + @Override public IgniteInternalFuture<Object> apply() { updateFut.map(); return updateFut; @@ -868,7 +869,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param expiryPlc Expiry policy. * @return Get future. */ - private IgniteFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys, + private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys, boolean reload, boolean forcePrimary, @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter, @@ -1015,13 +1016,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable final GridCacheEntryEx<K, V> cached, final CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb ) { - IgniteFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion()); + IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion()); if (forceFut.isDone()) updateAllAsyncInternal0(nodeId, req, completionCb); else { - forceFut.listenAsync(new CI1<IgniteFuture<Object>>() { - @Override public void apply(IgniteFuture<Object> t) { + forceFut.listenAsync(new CI1<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> t) { updateAllAsyncInternal0(nodeId, req, completionCb); } }); @@ -1776,7 +1777,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.addNearTtl(i, ttl, expireTime); if (updRes.newValue() != null || newValBytes != null) { - IgniteFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); + IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); assert f == null : f; } @@ -2049,7 +2050,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.addNearTtl(idx, updRes.newTtl(), -1); if (writeVal != null || !entry.valueBytes().isNull()) { - IgniteFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); + IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); assert f == null : f; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 539a462..5cd8d54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.lang.*; @@ -440,8 +441,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> snapshot = fut.topologySnapshot(); } else { - fut.listenAsync(new CI1<IgniteFuture<Long>>() { - @Override public void apply(IgniteFuture<Long> t) { + fut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { + @Override public void apply(IgniteInternalFuture<Long> t) { mapOnTopology(keys, remap, oldNodeId); } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 668f6fe..7811713 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -152,7 +153,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<Map<K, V>> getAllAsync( + @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( @Nullable final Collection<? extends K> keys, boolean forcePrimary, boolean skipTx, @@ -172,7 +173,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { - @Override public IgniteFuture<Map<K, V>> op(IgniteTxLocalAdapter<K, V> tx) { + @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter<K, V> tx) { return ctx.wrapCloneMap(tx.getAllAsync(ctx, keys, entry, deserializePortable, filter)); } }); @@ -238,7 +239,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param expiryPlc Expiry policy. * @return Loaded values. */ - public IgniteFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys, + public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys, boolean readThrough, boolean reload, boolean forcePrimary, @@ -373,7 +374,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * * {@inheritDoc} */ - @Override public IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, + @Override public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, @Nullable IgniteTxLocalEx<K, V> tx, boolean isInvalidate, @@ -606,7 +607,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param filter filter Optional filter. * @return Lock future. */ - IgniteFuture<Exception> lockAllAsync( + IgniteInternalFuture<Exception> lockAllAsync( final GridCacheContext<K, V> cacheCtx, @Nullable final GridNearTxLocal<K, V> tx, final long threadId, @@ -620,7 +621,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte ) { assert keys != null; - IgniteFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); + IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); // Prevent embedded future creation if possible. if (keyFut.isDone()) { @@ -645,8 +646,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } else { return new GridEmbeddedFuture<>(true, keyFut, - new C2<Object, Exception, IgniteFuture<Exception>>() { - @Override public IgniteFuture<Exception> apply(Object o, Exception exx) { + new C2<Object, Exception, IgniteInternalFuture<Exception>>() { + @Override public IgniteInternalFuture<Exception> apply(Object o, Exception exx) { if (exx != null) return new GridDhtFinishedFuture<>(ctx.kernalContext(), exx); @@ -679,7 +680,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param filter filter Optional filter. * @return Lock future. */ - private IgniteFuture<Exception> lockAllAsync0( + private IgniteInternalFuture<Exception> lockAllAsync0( GridCacheContext<K, V> cacheCtx, @Nullable final GridNearTxLocal<K, V> tx, long threadId, @@ -766,7 +767,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (log.isDebugEnabled()) log.debug("Performing colocated lock [tx=" + tx + ", keys=" + keys + ']'); - IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx, + IgniteInternalFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx, keys, tx.implicit(), txRead, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index c88311c..e0edcad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -173,8 +174,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @return Participating nodes. */ @Override public Collection<? extends ClusterNode> nodes() { - return F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) { + return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { if (isMini(f)) return ((MiniFuture)f).node(); @@ -371,7 +372,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity @Override public boolean onNodeLeft(UUID nodeId) { boolean found = false; - for (IgniteFuture<?> fut : futures()) { + for (IgniteInternalFuture<?> fut : futures()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -404,7 +405,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']'); - for (IgniteFuture<Boolean> fut : pending()) { + for (IgniteInternalFuture<Boolean> fut : pending()) { if (isMini(fut)) { MiniFuture mini = (MiniFuture)fut; @@ -536,7 +537,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @param f Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteFuture<?> f) { + private boolean isMini(IgniteInternalFuture<?> f) { return f.getClass().equals(MiniFuture.class); } @@ -596,8 +597,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity markInitialized(); } else { - fut.listenAsync(new CI1<IgniteFuture<Long>>() { - @Override public void apply(IgniteFuture<Long> t) { + fut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { + @Override public void apply(IgniteInternalFuture<Long> t) { mapOnTopology(); } }); @@ -859,7 +860,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity add(fut); // Append new future. - IgniteFuture<?> txSync = null; + IgniteInternalFuture<?> txSync = null; if (inTx()) txSync = cctx.tm().awaitFinishAckAsync(node.id(), tx.threadId()); @@ -878,8 +879,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } } else { - txSync.listenAsync(new CI1<IgniteFuture<?>>() { - @Override public void apply(IgniteFuture<?> t) { + txSync.listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { try { if (log.isDebugEnabled()) log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); @@ -912,7 +913,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (log.isDebugEnabled()) log.debug("Before locally locking keys : " + keys); - IgniteFuture<Exception> fut = cctx.colocated().lockAllAsync(cctx, + IgniteInternalFuture<Exception> fut = cctx.colocated().lockAllAsync(cctx, tx, threadId, lockVer, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 67a5905..16a56e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; @@ -131,7 +132,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param f Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteFuture<?> f) { + private boolean isMini(IgniteInternalFuture<?> f) { return f.getClass().equals(MiniFuture.class); } @@ -156,7 +157,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec int type = evt.type(); - for (IgniteFuture<?> f : futures()) { + for (IgniteInternalFuture<?> f : futures()) { if (isMini(f)) { MiniFuture mini = (MiniFuture)f; @@ -180,7 +181,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec */ @SuppressWarnings( {"unchecked"}) public void onResult(UUID nodeId, GridDhtForceKeysResponse<K, V> res) { - for (IgniteFuture<Object> f : futures()) + for (IgniteInternalFuture<Object> f : futures()) if (isMini(f)) { MiniFuture mini = (MiniFuture)f; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 9c7652c..8a6fef8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; @@ -161,7 +162,7 @@ public class GridDhtPartitionDemandPool<K, V> { /** * @return Future for {@link org.apache.ignite.cache.CachePreloadMode#SYNC} mode. */ - IgniteFuture<?> syncFuture() { + IgniteInternalFuture<?> syncFuture() { return syncFut; } @@ -206,8 +207,8 @@ public class GridDhtPartitionDemandPool<K, V> { if (log.isDebugEnabled()) log.debug("Forcing preload event for future: " + exchFut); - exchFut.listenAsync(new CI1<IgniteFuture<Long>>() { - @Override public void apply(IgniteFuture<Long> t) { + exchFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { + @Override public void apply(IgniteInternalFuture<Long> t) { cctx.shared().exchange().forcePreloadExchange(exchFut); } }); @@ -355,8 +356,8 @@ public class GridDhtPartitionDemandPool<K, V> { obj = new GridTimeoutObjectAdapter(delay) { @Override public void onTimeout() { - exchFut.listenAsync(new CI1<IgniteFuture<Long>>() { - @Override public void apply(IgniteFuture<Long> f) { + exchFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { + @Override public void apply(IgniteInternalFuture<Long> f) { cctx.shared().exchange().forcePreloadExchange(exchFut); } }); @@ -810,7 +811,7 @@ public class GridDhtPartitionDemandPool<K, V> { int preloadOrder = cctx.config().getPreloadOrder(); if (preloadOrder > 0) { - IgniteFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(preloadOrder); + IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(preloadOrder); try { if (fut != null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 0ebd0df..219737f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -21,10 +21,10 @@ import org.apache.ignite.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.timeout.*; @@ -130,7 +130,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon /** */ @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"}) @GridToStringInclude - private volatile IgniteFuture<?> partReleaseFut; + private volatile IgniteInternalFuture<?> partReleaseFut; /** */ private final Object mux = new Object(); @@ -377,7 +377,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon /** * @return Init future. */ - IgniteFuture<?> initFuture() { + IgniteInternalFuture<?> initFuture() { return initFut; } @@ -453,7 +453,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon cacheCtx.preloader().updateLastExchangeFuture(this); } - IgniteFuture<?> partReleaseFut = cctx.partitionReleaseFuture(topVer); + IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(topVer); // Assign to class variable so it will be included into toString() method. this.partReleaseFut = partReleaseFut; @@ -728,8 +728,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon } } else { - initFut.listenAsync(new CI1<IgniteFuture<Boolean>>() { - @Override public void apply(IgniteFuture<Boolean> t) { + initFut.listenAsync(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> t) { try { if (!t.get()) // Just to check if there was an error. return; @@ -826,8 +826,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon assert exchId.topologyVersion() == msg.topologyVersion(); - initFut.listenAsync(new CI1<IgniteFuture<Boolean>>() { - @Override public void apply(IgniteFuture<Boolean> t) { + initFut.listenAsync(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> t) { assert msg.lastVersion() != null; cctx.versions().onReceived(nodeId, msg.lastVersion()); @@ -886,8 +886,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon try { // Wait for initialization part of this future to complete. - initFut.listenAsync(new CI1<IgniteFuture<?>>() { - @Override public void apply(IgniteFuture<?> f) { + initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { if (isDone()) return;