http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxProxyImpl.java index 4e61db6..1ace06d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxProxyImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxProxyImpl.java @@ -190,7 +190,7 @@ public class GridCacheTxProxyImpl<K, V> implements GridCacheTxProxy, Externaliza } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheTx> commitAsync() { + @Override public InternalFuture<GridCacheTx> commitAsync() { enter(); try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java index d28f728..8724658 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java @@ -14,7 +14,6 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.fs.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; @@ -1052,10 +1051,10 @@ public class GridCacheUtils { * @param excl Excludes. * @return Future listener that logs errors. */ - public static IgniteInClosure<IgniteFuture<?>> errorLogger(final IgniteLogger log, + public static IgniteInClosure<InternalFuture<?>> errorLogger(final IgniteLogger log, final Class<? extends Exception>... excl) { - return new CI1<IgniteFuture<?>>() { - @Override public void apply(IgniteFuture<?> f) { + return new CI1<InternalFuture<?>>() { + @Override public void apply(InternalFuture<?> f) { try { f.get(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java index e9f6828..636488b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java @@ -495,9 +495,9 @@ public class GridCacheWriteBehindStore<K, V> implements GridCacheStore<K, V>, Li * @param key Key for which update is performed. * @param val New value, may be null for remove operation. * @param operation Updated value status - * @throws GridInterruptedException If interrupted while waiting for value to be flushed. + * @throws InternalInterruptedException If interrupted while waiting for value to be flushed. */ - private void updateCache(K key, @Nullable V val, StoreOperation operation) throws GridInterruptedException { + private void updateCache(K key, @Nullable V val, StoreOperation operation) throws InternalInterruptedException { StatefulValue<V> newVal = new StatefulValue<>(val, operation); StatefulValue<V> prev; @@ -724,7 +724,7 @@ public class GridCacheWriteBehindStore<K, V> implements GridCacheStore<K, V>, Li } /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, GridInterruptedException { + @Override protected void body() throws InterruptedException, InternalInterruptedException { while (!stopping.get() || writeCache.sizex() > 0) { awaitOperationsAvailable(); @@ -957,9 +957,9 @@ public class GridCacheWriteBehindStore<K, V> implements GridCacheStore<K, V>, Li /** * Awaits a signal on flush condition * - * @throws GridInterruptedException If thread was interrupted. + * @throws InternalInterruptedException If thread was interrupted. */ - private void waitForFlush() throws GridInterruptedException { + private void waitForFlush() throws InternalInterruptedException { U.await(flushCond); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicSequenceImpl.java index 8785b2a..cb32528 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicSequenceImpl.java @@ -11,7 +11,6 @@ package org.gridgain.grid.kernal.processors.cache.datastructures; import org.apache.ignite.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.datastructures.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -255,7 +254,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc * @throws IgniteCheckedException If update failed. */ @SuppressWarnings("SignalWithoutCorrespondingAwait") - private IgniteFuture<Long> internalUpdateAsync(long l, @Nullable Callable<Long> updateCall, boolean updated) + private InternalFuture<Long> internalUpdateAsync(long l, @Nullable Callable<Long> updateCall, boolean updated) throws IgniteCheckedException { checkRemoved(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java index bedefa9..0e6763c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.future.*; @@ -210,7 +209,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound */ public void onResult(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse<K, V> res) { if (!isDone()) { - for (IgniteFuture<Boolean> fut : pending()) { + for (InternalFuture<Boolean> fut : pending()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -243,7 +242,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - for (IgniteFuture<?> fut : futures()) + for (InternalFuture<?> fut : futures()) if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -293,7 +292,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound * @param f Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteFuture<?> f) { + private boolean isMini(InternalFuture<?> f) { return f.getClass().equals(MiniFuture.class); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java index 1e31fdf..31bd481 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java @@ -159,7 +159,7 @@ public class GridCachePessimisticCheckCommittedTxFuture<K, V> extends GridCompou */ public void onResult(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) { if (!isDone()) { - for (IgniteFuture<GridCacheCommittedTxInfo<K, V>> fut : pending()) { + for (InternalFuture<GridCacheCommittedTxInfo<K, V>> fut : pending()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -192,7 +192,7 @@ public class GridCachePessimisticCheckCommittedTxFuture<K, V> extends GridCompou /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - for (IgniteFuture<?> fut : futures()) + for (InternalFuture<?> fut : futures()) if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -248,7 +248,7 @@ public class GridCachePessimisticCheckCommittedTxFuture<K, V> extends GridCompou * @param f Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteFuture<?> f) { + private boolean isMini(InternalFuture<?> f) { return f.getClass().equals(MiniFuture.class); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxFinishSync.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxFinishSync.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxFinishSync.java index d71b4e9..6e0386e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxFinishSync.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxFinishSync.java @@ -11,7 +11,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.future.*; @@ -64,7 +63,7 @@ public class GridCacheTxFinishSync<K, V> { * @param threadId Thread ID to wait ack. * @return {@code null} if ack was received or future that will be completed when ack is received. */ - public IgniteFuture<?> awaitAckAsync(UUID nodeId, long threadId) { + public InternalFuture<?> awaitAckAsync(UUID nodeId, long threadId) { ThreadFinishSync threadSync = threadMap.get(threadId); if (threadSync == null) @@ -143,7 +142,7 @@ public class GridCacheTxFinishSync<K, V> { * @param nodeId Node ID to wait ack from. * @return {@code null} if ack has been received or future that will be completed when ack is received. */ - public IgniteFuture<?> awaitAckAsync(UUID nodeId) { + public InternalFuture<?> awaitAckAsync(UUID nodeId) { TxFinishSync sync = nodeMap.get(nodeId); if (sync == null) @@ -226,7 +225,7 @@ public class GridCacheTxFinishSync<K, V> { * * @return {@code null} if ack has been received, or future that will be completed when ack is received. */ - @Nullable public IgniteFuture<?> awaitAckAsync() { + @Nullable public InternalFuture<?> awaitAckAsync() { synchronized (this) { if (cnt == 0) return null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java index 21e0b8b..514bac8 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -49,7 +49,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> txLockAsync( + @Override public InternalFuture<Boolean> txLockAsync( Collection<? extends K> keys, long timeout, GridCacheTxLocalEx<K, V> tx, @@ -65,7 +65,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, + @Override public InternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, IgnitePredicate<GridCacheEntry<K, V>>... filter) { GridCacheTxLocalEx<K, V> tx = ctx.tm().userTxx(); @@ -84,7 +84,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * @param filter Optional filter. * @return Future for locks. */ - protected abstract IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, + protected abstract InternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, @Nullable GridCacheTxLocalEx<K, V> tx, boolean isInvalidate, boolean isRead, boolean retval, @Nullable GridCacheTxIsolation isolation, IgnitePredicate<GridCacheEntry<K, V>>[] filter); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 3cd3e2d..42ac6e8 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -380,7 +380,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K, } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheTxEx<K, V>> prepareAsync() { + @Override public InternalFuture<GridCacheTxEx<K, V>> prepareAsync() { assert false; return null; } @@ -724,7 +724,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K, } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheTx> commitAsync() { + @Override public InternalFuture<GridCacheTx> commitAsync() { try { commit(); @@ -755,7 +755,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K, } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheTx> rollbackAsync() { + @Override public InternalFuture<GridCacheTx> rollbackAsync() { rollback(); return new GridFinishedFutureEx<GridCacheTx>(this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index c13f8f4..dd68d32 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -237,7 +237,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param topVer Topology version. * @return Finish future. */ - @Nullable public IgniteFuture<?> multiUpdateFinishFuture(long topVer) { + @Nullable public InternalFuture<?> multiUpdateFinishFuture(long topVer) { GridCompoundFuture<IgniteUuid, Object> fut = null; for (MultiUpdateFuture multiFut : multiTxFuts.values()) { @@ -425,7 +425,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param filter {@inheritDoc} * @return {@inheritDoc} */ - @Override public IgniteFuture<Map<K, V>> getAllAsync( + @Override public InternalFuture<Map<K, V>> getAllAsync( @Nullable Collection<? extends K> keys, boolean forcePrimary, boolean skipTx, @@ -455,7 +455,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param filter {@inheritDoc} * @return {@inheritDoc} */ - IgniteFuture<Map<K, V>> getDhtAllAsync(@Nullable Collection<? extends K> keys, @Nullable UUID subjId, + InternalFuture<Map<K, V>> getDhtAllAsync(@Nullable Collection<? extends K> keys, @Nullable UUID subjId, String taskName, boolean deserializePortable, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { return getAllAsync(keys, null, /*don't check local tx. */false, subjId, taskName, deserializePortable, false, filter); @@ -488,12 +488,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest<K, V> req) { assert isAffinityNode(cacheCfg); - IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut = + InternalFuture<Collection<GridCacheEntryInfo<K, V>>> fut = getDhtAsync(nodeId, req.messageId(), req.keys(), req.reload(), req.topologyVersion(), req.subjectId(), req.taskNameHash(), false, req.filter()); - fut.listenAsync(new CI1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>>() { - @Override public void apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> f) { + fut.listenAsync(new CI1<InternalFuture<Collection<GridCacheEntryInfo<K, V>>>>() { + @Override public void apply(InternalFuture<Collection<GridCacheEntryInfo<K, V>>> f) { GridNearGetResponse<K, V> res = new GridNearGetResponse<>(ctx.cacheId(), req.futureId(), req.miniId(), req.version()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java index be2596d..d1ce358 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -345,7 +345,7 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { * @throws GridCacheEntryRemovedException If entry was removed. */ @SuppressWarnings("unchecked") - @Nullable public IgniteFuture<Boolean> addReader(UUID nodeId, long msgId, long topVer) + @Nullable public InternalFuture<Boolean> addReader(UUID nodeId, long msgId, long topVer) throws GridCacheEntryRemovedException { // Don't add local node as reader. if (cctx.nodeId().equals(nodeId)) @@ -439,8 +439,8 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { if (!txFut.isDone()) { final ReaderId<K, V> reader0 = reader; - txFut.listenAsync(new CI1<IgniteFuture<?>>() { - @Override public void apply(IgniteFuture<?> f) { + txFut.listenAsync(new CI1<InternalFuture<?>>() { + @Override public void apply(InternalFuture<?> f) { synchronized (this) { // Release memory. reader0.resetTxFuture(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java index 204c03d..3ee0b26 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java @@ -41,7 +41,7 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem * @param embedded Embedded. * @param c Closure. */ - public GridDhtEmbeddedFuture(GridKernalContext ctx, IgniteFuture<B> embedded, IgniteBiClosure<B, Exception, A> c) { + public GridDhtEmbeddedFuture(GridKernalContext ctx, InternalFuture<B> embedded, IgniteBiClosure<B, Exception, A> c) { super(ctx, embedded, c); invalidParts = Collections.emptyList(); @@ -52,8 +52,8 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem * @param c Embedding closure. * @param ctx Kernal context. */ - public GridDhtEmbeddedFuture(IgniteFuture<B> embedded, - IgniteBiClosure<B, Exception, IgniteFuture<A>> c, GridKernalContext ctx) { + public GridDhtEmbeddedFuture(InternalFuture<B> embedded, + IgniteBiClosure<B, Exception, InternalFuture<A>> c, GridKernalContext ctx) { super(embedded, c, ctx); invalidParts = Collections.emptyList(); @@ -65,7 +65,7 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem * @param c Closure. * @param invalidParts Retries. */ - public GridDhtEmbeddedFuture(GridKernalContext ctx, IgniteFuture<B> embedded, IgniteBiClosure<B, Exception, A> c, + public GridDhtEmbeddedFuture(GridKernalContext ctx, InternalFuture<B> embedded, IgniteBiClosure<B, Exception, A> c, Collection<Integer> invalidParts) { super(ctx, embedded, c); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtFuture.java index be2c93f..7ff6342 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtFuture.java @@ -16,7 +16,7 @@ import java.util.*; /** * Keys to retry. */ -public interface GridDhtFuture<T> extends IgniteFuture<T> { +public interface GridDhtFuture<T> extends InternalFuture<T> { /** * Node that future should be able to provide keys to retry before * it completes, so it's not necessary to wait till future is done http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java index 6db3540..7cb1f74 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -261,7 +261,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * @return Future for local get. */ @SuppressWarnings( {"unchecked", "IfMayBeConditional"}) - private IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> getAsync(final LinkedHashMap<? extends K, Boolean> keys) { + private InternalFuture<Collection<GridCacheEntryInfo<K, V>>> getAsync(final LinkedHashMap<? extends K, Boolean> keys) { if (F.isEmpty(keys)) return new GridFinishedFuture<Collection<GridCacheEntryInfo<K, V>>>(cctx.kernalContext(), Collections.<GridCacheEntryInfo<K, V>>emptyList()); @@ -296,7 +296,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col // TODO: To fix, check that reader is contained in the list of readers once // TODO: again after the returned future completes - if not, try again. // TODO: Also, why is info read before transactions are complete, and not after? - IgniteFuture<Boolean> f = (!e.deleted() && k.getValue()) ? e.addReader(reader, msgId, topVer) : null; + InternalFuture<Boolean> f = (!e.deleted() && k.getValue()) ? e.addReader(reader, msgId, topVer) : null; if (f != null) { if (txFut == null) @@ -322,7 +322,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col if (txFut != null) txFut.markInitialized(); - IgniteFuture<Map<K, V>> fut; + InternalFuture<Map<K, V>> fut; if (txFut == null || txFut.isDone()) { if (reload && cctx.isStoreEnabled() && cctx.store().configured()) @@ -337,8 +337,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col // transactions to complete. fut = new GridEmbeddedFuture<>( txFut, - new C2<Boolean, Exception, IgniteFuture<Map<K, V>>>() { - @Override public IgniteFuture<Map<K, V>> apply(Boolean b, Exception e) { + new C2<Boolean, Exception, InternalFuture<Map<K, V>>>() { + @Override public InternalFuture<Map<K, V>> apply(Boolean b, Exception e) { if (e != null) throw new GridClosureException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLocalPartition.java index b77bae2..aaa2956 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -235,7 +235,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new GridInterruptedException(e); + throw new InternalInterruptedException(e); } } @@ -382,7 +382,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti * @param updateSeq Update sequence. * @return Future to signal that this node is no longer an owner or backup. */ - IgniteFuture<?> rent(boolean updateSeq) { + InternalFuture<?> rent(boolean updateSeq) { while (true) { int reservations = state.getStamp(); @@ -410,7 +410,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti * @param updateSeq Update sequence. * @return Future for evict attempt. */ - private IgniteFuture<Boolean> tryEvictAsync(boolean updateSeq) { + private InternalFuture<Boolean> tryEvictAsync(boolean updateSeq) { if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) { if (log.isDebugEnabled()) log.debug("Evicted partition: " + this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java index be4153a..818600b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; @@ -217,8 +216,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @return Participating nodes. */ @Override public Collection<? extends ClusterNode> nodes() { - return F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) { + return F.viewReadOnly(futures(), new IgniteClosure<InternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(InternalFuture<?> f) { if (isMini(f)) return ((MiniFuture)f).node(); @@ -453,7 +452,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo @Override public boolean onNodeLeft(UUID nodeId) { boolean found = false; - for (IgniteFuture<?> fut : futures()) { + for (InternalFuture<?> fut : futures()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -479,7 +478,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo boolean found = false; - for (IgniteFuture<Boolean> fut : pending()) { + for (InternalFuture<Boolean> fut : pending()) { if (isMini(fut)) { MiniFuture mini = (MiniFuture)fut; @@ -695,7 +694,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param f Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteFuture<?> f) { + private boolean isMini(InternalFuture<?> f) { return f.getClass().equals(MiniFuture.class); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTopologyFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTopologyFuture.java index cf3ad80..3b0d3e1 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTopologyFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTopologyFuture.java @@ -21,7 +21,7 @@ import org.gridgain.grid.kernal.managers.discovery.*; * When new new transaction is started, it will wait for this future before acquiring new locks on particular * topology version. */ -public interface GridDhtTopologyFuture extends IgniteFuture<Long> { +public interface GridDhtTopologyFuture extends InternalFuture<Long> { /** * Gets a topology snapshot for the topology version represented by the future. Note that by the time * partition exchange completes some nodes from the snapshot may leave the grid. One should use discovery http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 69500dd..217a56d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -329,14 +329,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param req Request. */ protected final void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest<K, V> req) { - IgniteFuture<Object> keyFut = F.isEmpty(req.keys()) ? null : + InternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null : ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion()); if (keyFut == null || keyFut.isDone()) processDhtLockRequest0(nodeId, req); else { - keyFut.listenAsync(new CI1<IgniteFuture<Object>>() { - @Override public void apply(IgniteFuture<Object> t) { + keyFut.listenAsync(new CI1<InternalFuture<Object>>() { + @Override public void apply(InternalFuture<Object> t) { processDhtLockRequest0(nodeId, req); } }); @@ -507,7 +507,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } // Group lock can be only started from local node, so we never start group lock transaction on remote node. - IgniteFuture<?> f = lockAllAsync(ctx, nearNode, req, null); + InternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null); // Register listener just so we print out errors. // Exclude lock timeout exception since it's not a fatal exception. @@ -536,7 +536,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys, + @Override public InternalFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys, long timeout, GridCacheTxLocalEx<K, V> txx, boolean isInvalidate, @@ -638,14 +638,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param filter0 Filter. * @return Future. */ - public IgniteFuture<GridNearLockResponse<K, V>> lockAllAsync( + public InternalFuture<GridNearLockResponse<K, V>> lockAllAsync( final GridCacheContext<K, V> cacheCtx, final ClusterNode nearNode, final GridNearLockRequest<K, V> req, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter0) { final List<K> keys = req.keys(); - IgniteFuture<Object> keyFut = null; + InternalFuture<Object> keyFut = null; if (req.onePhaseCommit()) { boolean forceKeys = req.hasTransforms() || req.filter() != null; @@ -663,8 +663,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach keyFut = new GridFinishedFutureEx<>(); return new GridEmbeddedFuture<>(true, keyFut, - new C2<Object, Exception, IgniteFuture<GridNearLockResponse<K,V>>>() { - @Override public IgniteFuture<GridNearLockResponse<K, V>> apply(Object o, Exception exx) { + new C2<Object, Exception, InternalFuture<GridNearLockResponse<K,V>>>() { + @Override public InternalFuture<GridNearLockResponse<K, V>> apply(Object o, Exception exx) { if (exx != null) return new GridDhtFinishedFuture<>(ctx.kernalContext(), exx); @@ -792,7 +792,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach assert req.writeEntries() == null || req.writeEntries().size() == entries.size(); - IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync( + InternalFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync( cacheCtx, entries, req.writeEntries(), @@ -806,8 +806,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach return new GridDhtEmbeddedFuture<>( txFut, - new C2<GridCacheReturn<V>, Exception, IgniteFuture<GridNearLockResponse<K, V>>>() { - @Override public IgniteFuture<GridNearLockResponse<K, V>> apply(GridCacheReturn<V> o, + new C2<GridCacheReturn<V>, Exception, InternalFuture<GridNearLockResponse<K, V>>>() { + @Override public InternalFuture<GridNearLockResponse<K, V>> apply(GridCacheReturn<V> o, Exception e) { if (e != null) e = U.unwrap(e); @@ -822,8 +822,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach assert t.implicit(); return t.commitAsync().chain( - new C1<IgniteFuture<GridCacheTx>, GridNearLockResponse<K, V>>() { - @Override public GridNearLockResponse<K, V> apply(IgniteFuture<GridCacheTx> f) { + new C1<InternalFuture<GridCacheTx>, GridNearLockResponse<K, V>>() { + @Override public GridNearLockResponse<K, V> apply(InternalFuture<GridCacheTx> f) { try { // Check for error. f.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 2e9311d..0464ff4 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; @@ -112,8 +111,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur */ @Override public Collection<? extends ClusterNode> nodes() { return - F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) { + F.viewReadOnly(futures(), new IgniteClosure<InternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(InternalFuture<?> f) { if (isMini(f)) return ((MiniFuture)f).node(); @@ -124,7 +123,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - for (IgniteFuture<?> fut : futures()) + for (InternalFuture<?> fut : futures()) if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -189,7 +188,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur */ public void onResult(UUID nodeId, GridDhtTxFinishResponse<K, V> res) { if (!isDone()) { - for (IgniteFuture<GridCacheTx> fut : futures()) { + for (InternalFuture<GridCacheTx> fut : futures()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -229,7 +228,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @param f Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteFuture<?> f) { + private boolean isMini(InternalFuture<?> f) { return f.getClass().equals(MiniFuture.class); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java index b53c1c8..587b2cd 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -232,7 +232,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheTxEx<K, V>> future() { + @Override public InternalFuture<GridCacheTxEx<K, V>> future() { return prepFut.get(); } @@ -244,7 +244,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements } /** {@inheritDoc} */ - @Override @Nullable protected IgniteFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, + @Override @Nullable protected InternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, GridCacheTxEntry<K, V> entry, long topVer) { // Don't add local node as reader. if (!cctx.localNodeId().equals(nearNodeId)) { @@ -274,7 +274,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheTxEx<K, V>> prepareAsync() { + @Override public InternalFuture<GridCacheTxEx<K, V>> prepareAsync() { if (optimistic()) { assert isSystemInvalidate(); @@ -346,7 +346,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements * @param lastBackups IDs of backup nodes receiving last prepare request. * @return Future that will be completed when locks are acquired. */ - public IgniteFuture<GridCacheTxEx<K, V>> prepareAsync(@Nullable Iterable<GridCacheTxEntry<K, V>> reads, + public InternalFuture<GridCacheTxEx<K, V>> prepareAsync(@Nullable Iterable<GridCacheTxEntry<K, V>> reads, @Nullable Iterable<GridCacheTxEntry<K, V>> writes, Map<GridCacheTxKey<K>, GridCacheVersion> verMap, long msgId, IgniteUuid nearMiniId, Map<UUID, Collection<UUID>> txNodes, boolean last, Collection<UUID> lastBackups) { assert optimistic(); @@ -442,7 +442,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements /** {@inheritDoc} */ @SuppressWarnings({"ThrowableInstanceNeverThrown"}) - @Override public IgniteFuture<GridCacheTx> commitAsync() { + @Override public InternalFuture<GridCacheTx> commitAsync() { if (log.isDebugEnabled()) log.debug("Committing dht local tx: " + this); @@ -479,8 +479,8 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements } } else - prep.listenAsync(new CI1<IgniteFuture<GridCacheTxEx<K, V>>>() { - @Override public void apply(IgniteFuture<GridCacheTxEx<K, V>> f) { + prep.listenAsync(new CI1<InternalFuture<GridCacheTxEx<K, V>>>() { + @Override public void apply(InternalFuture<GridCacheTxEx<K, V>> f) { try { f.get(); // Check for errors of a parent future. @@ -537,7 +537,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheTx> rollbackAsync() { + @Override public InternalFuture<GridCacheTx> rollbackAsync() { GridDhtTxPrepareFuture<K, V> prepFut = this.prepFut.get(); final GridDhtTxFinishFuture<K, V> fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false); @@ -567,8 +567,8 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements else { prepFut.complete(); - prepFut.listenAsync(new CI1<IgniteFuture<GridCacheTxEx<K, V>>>() { - @Override public void apply(IgniteFuture<GridCacheTxEx<K, V>> f) { + prepFut.listenAsync(new CI1<InternalFuture<GridCacheTxEx<K, V>>>() { + @Override public void apply(InternalFuture<GridCacheTxEx<K, V>> f) { try { f.get(); // Check for errors of a parent future. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index d594df3..8fdb5af 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -130,7 +130,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte * @param entry Transaction entry. * @return {@code True} if reader was added as a result of this call. */ - @Nullable protected abstract IgniteFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, + @Nullable protected abstract InternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, GridCacheTxEntry<K, V> entry, long topVer); /** @@ -386,7 +386,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte * @return Future for active transactions for the time when reader was added. * @throws IgniteCheckedException If failed. */ - @Nullable public IgniteFuture<Boolean> addEntry(long msgId, GridCacheTxEntry<K, V> e) throws IgniteCheckedException { + @Nullable public InternalFuture<Boolean> addEntry(long msgId, GridCacheTxEntry<K, V> e) throws IgniteCheckedException { init(); GridCacheTxState state = state(); @@ -473,7 +473,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte * @param read Read flag. * @return Lock future. */ - IgniteFuture<GridCacheReturn<V>> lockAllAsync( + InternalFuture<GridCacheReturn<V>> lockAllAsync( GridCacheContext<K, V> cacheCtx, Collection<GridCacheEntryEx<K, V>> entries, List<GridCacheTxEntry<K, V>> writeEntries, @@ -583,7 +583,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte * @param filter Entry write filter. * @return Future for lock acquisition. */ - private IgniteFuture<GridCacheReturn<V>> obtainLockAsync( + private InternalFuture<GridCacheReturn<V>> obtainLockAsync( final GridCacheContext<K, V> cacheCtx, GridCacheReturn<V> ret, final Collection<? extends K> passedKeys, @@ -599,7 +599,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte GridDhtTransactionalCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx(); - IgniteFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys, + InternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys, lockTimeout(), this, isInvalidate(), read, /*retval*/false, isolation, CU.<K, V>empty()); return new GridEmbeddedFuture<>( http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index f872cf9..1ff5d52 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -165,8 +165,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu */ @Override public Collection<? extends ClusterNode> nodes() { return - F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) { + F.viewReadOnly(futures(), new IgniteClosure<InternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(InternalFuture<?> f) { if (isMini(f)) return ((MiniFuture)f).node(); @@ -237,7 +237,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - for (IgniteFuture<?> fut : futures()) + for (InternalFuture<?> fut : futures()) if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -295,7 +295,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu */ public void onResult(UUID nodeId, GridDhtTxPrepareResponse<K, V> res) { if (!isDone()) { - for (IgniteFuture<GridCacheTxEx<K, V>> fut : pending()) { + for (InternalFuture<GridCacheTxEx<K, V>> fut : pending()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -406,7 +406,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu try { get(); } - catch (GridInterruptedException e) { + catch (InternalInterruptedException e) { onError(new IgniteCheckedException("Got interrupted while waiting for replies to be sent.", e)); } catch (IgniteCheckedException ignored) { @@ -503,7 +503,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu * @param f Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteFuture<?> f) { + private boolean isMini(InternalFuture<?> f) { return f.getClass().equals(MiniFuture.class); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index f79bc2f..c097871 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.near.*; @@ -180,8 +179,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M @SuppressWarnings("unchecked") @Override public Collection<? extends ClusterNode> nodes() { return - F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<Map<K, V>>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteFuture<Map<K, V>> f) { + F.viewReadOnly(futures(), new IgniteClosure<InternalFuture<Map<K, V>>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(InternalFuture<Map<K, V>> f) { if (isMini(f)) return ((MiniFuture)f).node(); @@ -192,7 +191,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - for (IgniteFuture<Map<K, V>> fut : futures()) + for (InternalFuture<Map<K, V>> fut : futures()) if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -211,7 +210,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param res Result. */ public void onResult(UUID nodeId, GridNearGetResponse<K, V> res) { - for (IgniteFuture<Map<K, V>> fut : futures()) + for (InternalFuture<Map<K, V>> fut : futures()) if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -240,7 +239,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param f Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteFuture<Map<K, V>> f) { + private boolean isMini(InternalFuture<Map<K, V>> f) { return f.getClass().equals(MiniFuture.class); } @@ -317,8 +316,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M } // Add new future. - add(fut.chain(new C1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>, Map<K, V>>() { - @Override public Map<K, V> apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut) { + add(fut.chain(new C1<InternalFuture<Collection<GridCacheEntryInfo<K, V>>>, Map<K, V>>() { + @Override public Map<K, V> apply(InternalFuture<Collection<GridCacheEntryInfo<K, V>>> fut) { try { return createResultMap(fut.get()); } @@ -650,11 +649,11 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']'); // Need to wait for next topology version to remap. - IgniteFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer); + InternalFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer); - topFut.listenAsync(new CIX1<IgniteFuture<Long>>() { + topFut.listenAsync(new CIX1<InternalFuture<Long>>() { @SuppressWarnings("unchecked") - @Override public void applyx(IgniteFuture<Long> fut) throws IgniteCheckedException { + @Override public void applyx(InternalFuture<Long> fut) throws IgniteCheckedException { long topVer = fut.get(); // This will append new futures to compound list. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 997671b..3b0d6e9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -13,8 +13,6 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.security.*; -import org.apache.ignite.portables.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.managers.communication.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -249,7 +247,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<Map<K, V>> getAllAsync( + @Override public InternalFuture<Map<K, V>> getAllAsync( @Nullable final Collection<? extends K> keys, final boolean forcePrimary, boolean skipTx, @@ -263,8 +261,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final UUID subjId0 = subjId; - return asyncOp(new CO<IgniteFuture<Map<K, V>>>() { - @Override public IgniteFuture<Map<K, V>> apply() { + return asyncOp(new CO<InternalFuture<Map<K, V>>>() { + @Override public InternalFuture<Map<K, V>> apply() { return getAllAsync0(keys, false, forcePrimary, filter, subjId0, taskName, deserializePortable); } }); @@ -290,14 +288,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry, + @Override public InternalFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry, long ttl, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) { return updateAllAsync0(F0.asMap(key, val), null, null, null, true, false, entry, ttl, filter); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry, long ttl, + @Override public InternalFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry, long ttl, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) { return updateAllAsync0(F0.asMap(key, val), null, null, null, false, false, entry, ttl, filter); } @@ -308,7 +306,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<V> putIfAbsentAsync(K key, V val) { + @Override public InternalFuture<V> putIfAbsentAsync(K key, V val) { return putAsync(key, val, ctx.noPeekArray()); } @@ -318,7 +316,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> putxIfAbsentAsync(K key, V val) { + @Override public InternalFuture<Boolean> putxIfAbsentAsync(K key, V val) { return putxAsync(key, val, ctx.noPeekArray()); } @@ -328,7 +326,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<V> replaceAsync(K key, V val) { + @Override public InternalFuture<V> replaceAsync(K key, V val) { return putAsync(key, val, ctx.hasPeekArray()); } @@ -338,7 +336,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> replacexAsync(K key, V val) { + @Override public InternalFuture<Boolean> replacexAsync(K key, V val) { return putxAsync(key, val, ctx.hasPeekArray()); } @@ -348,7 +346,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) { + @Override public InternalFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) { return putxAsync(key, newVal, ctx.equalsPeekArray(oldVal)); } @@ -364,13 +362,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<GridCacheReturn<V>> removexAsync(K key, V val) { + @Override public InternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { return removeAllAsync0(F.asList(key), null, null, true, true, ctx.equalsPeekArray(val)); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { + @Override public InternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { return updateAllAsync0(F.asMap(key, newVal), null, null, null, true, true, null, 0, ctx.equalsPeekArray(oldVal)); } @@ -382,7 +380,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> putAllAsync(Map<? extends K, ? extends V> m, + @Override public InternalFuture<?> putAllAsync(Map<? extends K, ? extends V> m, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { return updateAllAsync0(m, null, null, null, false, false, null, 0, filter); } @@ -393,7 +391,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) { + @Override public InternalFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) { ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); return updateAllAsync0(null, null, drMap, null, false, false, null, 0, null); @@ -413,7 +411,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(K key, IgniteClosure<V, V> transformer, + @Override public InternalFuture<?> transformAsync(K key, IgniteClosure<V, V> transformer, @Nullable GridCacheEntryEx<K, V> entry, long ttl) { return updateAllAsync0(null, Collections.singletonMap(key, transformer), null, null, false, false, entry, ttl, null); @@ -425,7 +423,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) { + @Override public InternalFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) { if (F.isEmpty(m)) return new GridFinishedFuture<Object>(ctx.kernalContext()); @@ -440,7 +438,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx<K, V> entry, + @Override public InternalFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx<K, V> entry, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) { return removeAllAsync0(Collections.singletonList(key), null, entry, true, false, filter); } @@ -452,7 +450,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> removeAllAsync(Collection<? extends K> keys, + @Override public InternalFuture<?> removeAllAsync(Collection<? extends K> keys, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { return removeAllAsync0(keys, null, null, false, false, filter); } @@ -465,7 +463,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx<K, V> entry, + @Override public InternalFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx<K, V> entry, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) { return removeAllAsync0(Collections.singletonList(key), null, entry, false, false, filter); } @@ -476,7 +474,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> removeAsync(K key, V val) { + @Override public InternalFuture<Boolean> removeAsync(K key, V val) { return removexAsync(key, ctx.equalsPeekArray(val)); } @@ -486,7 +484,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> removeAllAsync(IgnitePredicate<GridCacheEntry<K, V>>[] filter) { + @Override public InternalFuture<?> removeAllAsync(IgnitePredicate<GridCacheEntry<K, V>>[] filter) { return removeAllAsync(keySet(filter), filter); } @@ -496,7 +494,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> removeAllDrAsync(Map<? extends K, GridCacheVersion> drMap) { + @Override public InternalFuture<?> removeAllDrAsync(Map<? extends K, GridCacheVersion> drMap) { ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); return removeAllAsync0(null, drMap, null, false, false, null); @@ -514,8 +512,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @return Future. */ @SuppressWarnings("unchecked") - protected <T> IgniteFuture<T> asyncOp(final CO<IgniteFuture<T>> op) { - IgniteFuture<T> fail = asyncOpAcquire(); + protected <T> InternalFuture<T> asyncOp(final CO<InternalFuture<T>> op) { + InternalFuture<T> fail = asyncOpAcquire(); if (fail != null) return fail; @@ -525,12 +523,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { holder.lock(); try { - IgniteFuture fut = holder.future(); + InternalFuture fut = holder.future(); if (fut != null && !fut.isDone()) { - IgniteFuture<T> f = new GridEmbeddedFuture<>(fut, - new C2<T, Exception, IgniteFuture<T>>() { - @Override public IgniteFuture<T> apply(T t, Exception e) { + InternalFuture<T> f = new GridEmbeddedFuture<>(fut, + new C2<T, Exception, InternalFuture<T>>() { + @Override public InternalFuture<T> apply(T t, Exception e) { return op.apply(); } }, ctx.kernalContext()); @@ -540,7 +538,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return f; } - IgniteFuture<T> f = op.apply(); + InternalFuture<T> f = op.apply(); saveFuture(holder, f); @@ -552,7 +550,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override protected IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, + @Override protected InternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, @Nullable GridCacheTxLocalEx<K, V> tx, boolean isInvalidate, @@ -578,7 +576,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param filter Cache entry filter for atomic updates. * @return Completion future. */ - private IgniteFuture updateAllAsync0( + private InternalFuture updateAllAsync0( @Nullable final Map<? extends K, ? extends V> map, @Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> transformMap, @Nullable final Map<? extends K, GridCacheDrInfo<V>> drPutMap, @@ -616,8 +614,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { subjId, taskNameHash); - return asyncOp(new CO<IgniteFuture<Object>>() { - @Override public IgniteFuture<Object> apply() { + return asyncOp(new CO<InternalFuture<Object>>() { + @Override public InternalFuture<Object> apply() { updateFut.map(); return updateFut; @@ -636,7 +634,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param filter Cache entry filter for atomic removes. * @return Completion future. */ - private IgniteFuture removeAllAsync0( + private InternalFuture removeAllAsync0( @Nullable final Collection<? extends K> keys, @Nullable final Map<? extends K, GridCacheVersion> drMap, @Nullable GridCacheEntryEx<K, V> cached, @@ -672,8 +670,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { subjId, taskNameHash); - return asyncOp(new CO<IgniteFuture<Object>>() { - @Override public IgniteFuture<Object> apply() { + return asyncOp(new CO<InternalFuture<Object>>() { + @Override public InternalFuture<Object> apply() { updateFut.map(); return updateFut; @@ -690,7 +688,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param filter Filter. * @return Get future. */ - private IgniteFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys, boolean reload, + private InternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys, boolean reload, boolean forcePrimary, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, UUID subjId, String taskName, boolean deserializePortable) { ctx.checkSecurity(GridSecurityPermission.CACHE_READ); @@ -809,13 +807,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable final GridCacheEntryEx<K, V> cached, final CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb ) { - IgniteFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion()); + InternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion()); if (forceFut.isDone()) updateAllAsyncInternal0(nodeId, req, cached, completionCb); else { - forceFut.listenAsync(new CI1<IgniteFuture<Object>>() { - @Override public void apply(IgniteFuture<Object> t) { + forceFut.listenAsync(new CI1<InternalFuture<Object>>() { + @Override public void apply(InternalFuture<Object> t) { updateAllAsyncInternal0(nodeId, req, cached, completionCb); } }); @@ -1401,7 +1399,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.addNearValue(i, updRes.newValue(), newValBytes); if (updRes.newValue() != null || newValBytes != null) { - IgniteFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); + InternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); assert f == null : f; } @@ -1628,7 +1626,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.nearTtl(req.ttl()); if (writeVal != null || !entry.valueBytes().isNull()) { - IgniteFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); + InternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); assert f == null : f; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index d660112..78fc105 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.managers.discovery.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -413,8 +412,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> snapshot = fut.topologySnapshot(); } else { - fut.listenAsync(new CI1<IgniteFuture<Long>>() { - @Override public void apply(IgniteFuture<Long> t) { + fut.listenAsync(new CI1<InternalFuture<Long>>() { + @Override public void apply(InternalFuture<Long> t) { mapOnTopology(keys, remap, oldNodeId); } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index c028efd..c33798d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -141,7 +141,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<Map<K, V>> getAllAsync( + @Override public InternalFuture<Map<K, V>> getAllAsync( @Nullable final Collection<? extends K> keys, boolean forcePrimary, boolean skipTx, @@ -161,7 +161,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { - @Override public IgniteFuture<Map<K, V>> op(GridCacheTxLocalAdapter<K, V> tx) { + @Override public InternalFuture<Map<K, V>> op(GridCacheTxLocalAdapter<K, V> tx) { return ctx.wrapCloneMap(tx.getAllAsync(ctx, keys, entry, deserializePortable, filter)); } }); @@ -210,7 +210,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param filter Filter. * @return Loaded values. */ - public IgniteFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys, boolean reload, + public InternalFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys, boolean reload, boolean forcePrimary, long topVer, @Nullable UUID subjId, String taskName, boolean deserializePortable, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { if (keys == null || keys.isEmpty()) @@ -314,7 +314,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * * {@inheritDoc} */ - @Override public IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, + @Override public InternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, @Nullable GridCacheTxLocalEx<K, V> tx, boolean isInvalidate, boolean isRead, boolean retval, @Nullable GridCacheTxIsolation isolation, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { assert tx == null || tx instanceof GridNearTxLocal; @@ -533,7 +533,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param filter filter Optional filter. * @return Lock future. */ - IgniteFuture<Exception> lockAllAsync( + InternalFuture<Exception> lockAllAsync( final GridCacheContext<K, V> cacheCtx, @Nullable final GridNearTxLocal<K, V> tx, final long threadId, @@ -546,7 +546,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte ) { assert keys != null; - IgniteFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); + InternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); // Prevent embedded future creation if possible. if (keyFut.isDone()) { @@ -562,8 +562,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } else { return new GridEmbeddedFuture<>(true, keyFut, - new C2<Object, Exception, IgniteFuture<Exception>>() { - @Override public IgniteFuture<Exception> apply(Object o, Exception exx) { + new C2<Object, Exception, InternalFuture<Exception>>() { + @Override public InternalFuture<Exception> apply(Object o, Exception exx) { if (exx != null) return new GridDhtFinishedFuture<>(ctx.kernalContext(), exx); @@ -585,7 +585,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param filter filter Optional filter. * @return Lock future. */ - private IgniteFuture<Exception> lockAllAsync0( + private InternalFuture<Exception> lockAllAsync0( GridCacheContext<K, V> cacheCtx, @Nullable final GridNearTxLocal<K, V> tx, long threadId, final GridCacheVersion ver, final long topVer, final Collection<K> keys, final boolean txRead, @@ -657,7 +657,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (log.isDebugEnabled()) log.debug("Performing colocated lock [tx=" + tx + ", keys=" + keys + ']'); - IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx, keys, tx.implicit(), txRead); + InternalFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx, keys, tx.implicit(), txRead); return new GridDhtEmbeddedFuture<>( ctx.kernalContext(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index d27ee3d..8464448 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht.colocated; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.managers.discovery.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -157,8 +156,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity */ @Override public Collection<? extends ClusterNode> nodes() { return - F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) { + F.viewReadOnly(futures(), new IgniteClosure<InternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(InternalFuture<?> f) { if (isMini(f)) return ((MiniFuture)f).node(); @@ -335,7 +334,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity @Override public boolean onNodeLeft(UUID nodeId) { boolean found = false; - for (IgniteFuture<?> fut : futures()) { + for (InternalFuture<?> fut : futures()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -368,7 +367,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']'); - for (IgniteFuture<Boolean> fut : pending()) { + for (InternalFuture<Boolean> fut : pending()) { if (isMini(fut)) { MiniFuture mini = (MiniFuture)fut; @@ -500,7 +499,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @param f Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteFuture<?> f) { + private boolean isMini(InternalFuture<?> f) { return f.getClass().equals(MiniFuture.class); } @@ -560,8 +559,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity markInitialized(); } else { - fut.listenAsync(new CI1<IgniteFuture<Long>>() { - @Override public void apply(IgniteFuture<Long> t) { + fut.listenAsync(new CI1<InternalFuture<Long>>() { + @Override public void apply(InternalFuture<Long> t) { mapOnTopology(); } }); @@ -822,7 +821,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity add(fut); // Append new future. - IgniteFuture<?> txSync = null; + InternalFuture<?> txSync = null; if (inTx()) txSync = cctx.tm().awaitFinishAckAsync(node.id(), tx.threadId()); @@ -841,8 +840,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } } else { - txSync.listenAsync(new CI1<IgniteFuture<?>>() { - @Override public void apply(IgniteFuture<?> t) { + txSync.listenAsync(new CI1<InternalFuture<?>>() { + @Override public void apply(InternalFuture<?> t) { try { if (log.isDebugEnabled()) log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); @@ -875,7 +874,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (log.isDebugEnabled()) log.debug("Before locally locking keys : " + keys); - IgniteFuture<Exception> fut = cctx.colocated().lockAllAsync(cctx, tx, threadId, lockVer, + InternalFuture<Exception> fut = cctx.colocated().lockAllAsync(cctx, tx, threadId, lockVer, topVer, keys, read, timeout, filter); // Add new future.