futures: api cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/04a317be Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/04a317be Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/04a317be Branch: refs/heads/ignite-47 Commit: 04a317be7c033354de94b636de834bafde237fb3 Parents: 387e164 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Thu Mar 5 14:52:55 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Thu Mar 5 14:52:56 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/IgniteInternalFuture.java | 2 +- .../internal/client/GridClientFuture.java | 9 +- .../client/impl/GridClientDataImpl.java | 2 +- .../client/impl/GridClientFutureAdapter.java | 23 +- .../connection/GridClientNioTcpConnection.java | 2 +- .../impl/GridTcpRouterNioListenerAdapter.java | 2 +- .../internal/cluster/IgniteClusterImpl.java | 2 +- .../internal/executor/GridExecutorService.java | 2 +- .../processors/cache/GridCacheAdapter.java | 28 +- .../cache/GridCacheEvictionManager.java | 2 +- .../processors/cache/GridCacheIoManager.java | 68 +++-- .../cache/GridCacheMultiTxFuture.java | 2 +- .../processors/cache/GridCacheMvccManager.java | 2 +- .../GridCachePartitionExchangeManager.java | 4 +- .../processors/cache/IgniteCacheProxy.java | 2 +- .../GridDistributedCacheAdapter.java | 2 +- .../distributed/dht/GridDhtCacheAdapter.java | 2 +- .../distributed/dht/GridDhtCacheEntry.java | 4 +- .../dht/GridDhtTransactionalCacheAdapter.java | 4 +- .../cache/distributed/dht/GridDhtTxLocal.java | 4 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- .../dht/GridPartitionedGetFuture.java | 4 +- .../dht/atomic/GridDhtAtomicCache.java | 4 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +- .../colocated/GridDhtColocatedLockFuture.java | 4 +- .../preloader/GridDhtPartitionDemandPool.java | 4 +- .../GridDhtPartitionsExchangeFuture.java | 8 +- .../dht/preloader/GridDhtPreloader.java | 13 +- .../distributed/near/GridNearGetFuture.java | 4 +- .../distributed/near/GridNearLockFuture.java | 4 +- .../cache/distributed/near/GridNearTxLocal.java | 9 +- .../near/GridNearTxPrepareFuture.java | 2 +- .../local/atomic/GridLocalAtomicCache.java | 2 +- .../query/GridCacheDistributedQueryManager.java | 4 +- .../cache/query/GridCacheQueryManager.java | 7 +- .../cache/transactions/IgniteTxHandler.java | 10 +- .../cache/transactions/IgniteTxManager.java | 4 +- .../dataload/GridDataLoaderProcessor.java | 2 +- .../dataload/IgniteDataLoaderImpl.java | 12 +- .../processors/igfs/IgfsDataManager.java | 7 +- .../processors/igfs/IgfsInputStreamImpl.java | 2 +- .../processors/igfs/IgfsMetaManager.java | 2 +- .../internal/processors/igfs/IgfsServer.java | 3 +- .../processors/rest/GridRestProcessor.java | 2 +- .../handlers/task/GridTaskCommandHandler.java | 2 +- .../tcp/GridTcpMemcachedNioListener.java | 2 +- .../protocols/tcp/GridTcpRestNioListener.java | 6 +- .../processors/streamer/IgniteStreamerImpl.java | 2 +- .../ignite/internal/util/IgniteUtils.java | 2 +- .../util/future/GridCompoundFuture.java | 2 +- .../util/future/GridEmbeddedFuture.java | 12 +- .../util/future/GridFinishedFuture.java | 4 +- .../internal/util/future/GridFutureAdapter.java | 4 +- .../internal/util/future/IgniteFutureImpl.java | 2 +- .../ignite/internal/util/lang/GridFunc.java | 2 +- .../internal/util/lang/GridPlainFuture.java | 79 ----- .../util/lang/GridPlainFutureAdapter.java | 299 ------------------- .../util/nio/GridNioEmbeddedFuture.java | 2 +- .../util/nio/GridNioFinishedFuture.java | 2 +- .../ignite/internal/util/nio/GridNioFuture.java | 2 +- .../internal/util/nio/GridNioFutureImpl.java | 2 +- .../ignite/internal/util/nio/GridNioServer.java | 2 +- .../GridCacheFinishPartitionsSelfTest.java | 6 +- .../processors/igfs/IgfsAbstractSelfTest.java | 26 +- .../igfs/IgfsDualAbstractSelfTest.java | 18 +- .../util/future/GridFutureAdapterSelfTest.java | 10 +- .../future/GridFutureListenPerformanceTest.java | 22 +- .../util/future/nio/GridNioFutureSelfTest.java | 4 +- .../lang/GridFutureListenPerformanceTest.java | 2 +- .../ignite/messaging/GridMessagingSelfTest.java | 12 +- .../processors/hadoop/igfs/HadoopIgfsEx.java | 4 +- .../hadoop/igfs/HadoopIgfsFuture.java | 4 +- .../hadoop/igfs/HadoopIgfsInProc.java | 9 +- .../hadoop/igfs/HadoopIgfsInputStream.java | 6 +- .../processors/hadoop/igfs/HadoopIgfsIo.java | 6 +- .../processors/hadoop/igfs/HadoopIgfsIpcIo.java | 5 +- .../hadoop/igfs/HadoopIgfsOutProc.java | 47 +-- .../hadoop/jobtracker/HadoopJobTracker.java | 8 +- .../proto/HadoopProtocolJobStatusTask.java | 2 +- .../hadoop/shuffle/HadoopShuffleJob.java | 2 +- .../external/HadoopExternalTaskExecutor.java | 11 +- .../child/HadoopChildProcessRunner.java | 8 +- .../HadoopExternalCommunication.java | 2 +- 83 files changed, 271 insertions(+), 656 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java index 969d905..255a067 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java @@ -111,7 +111,7 @@ public interface IgniteInternalFuture<R> { * * @param lsnr Listener closure to register. If not provided - this method is no-op. */ - public void listenAsync(@Nullable IgniteInClosure<? super IgniteInternalFuture<R>> lsnr); + public void listen(@Nullable IgniteInClosure<? super IgniteInternalFuture<R>> lsnr); /** * Make a chained future to convert result of this future (when complete) into a new format. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientFuture.java index 3ffb37e..4f40fae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientFuture.java @@ -57,12 +57,5 @@ public interface GridClientFuture<R> { * * @param lsnrs Listeners to be registered. */ - public void listenAsync(GridClientFutureListener<R>... lsnrs); - - /** - * Removes listeners registered before. - * - * @param lsnrs Listeners to be removed. - */ - public void stopListenAsync(GridClientFutureListener<R>... lsnrs); + public void listen(GridClientFutureListener<R>... lsnrs); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientDataImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientDataImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientDataImpl.java index 3ace38e..5502ed7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientDataImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientDataImpl.java @@ -280,7 +280,7 @@ public class GridClientDataImpl extends GridClientAbstractProjection<GridClientD }); if (cacheMetrics) - fut.listenAsync(new GridClientFutureListener<GridClientDataMetrics>() { + fut.listen(new GridClientFutureListener<GridClientDataMetrics>() { @Override public void onDone(GridClientFuture<GridClientDataMetrics> fut) { try { metrics = fut.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientFutureAdapter.java index d22157a..37feaea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientFutureAdapter.java @@ -21,7 +21,6 @@ import org.apache.ignite.internal.client.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; -import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; import java.util.logging.*; @@ -201,7 +200,7 @@ public class GridClientFutureAdapter<R> extends AbstractQueuedSynchronizer imple * * @param lsnrs Listeners to be registered. */ - @Override public void listenAsync(final GridClientFutureListener<R>... lsnrs) { + @Override public void listen(final GridClientFutureListener<R>... lsnrs) { assert lsnrs != null; for (GridClientFutureListener<R> lsnr : lsnrs) @@ -212,26 +211,6 @@ public class GridClientFutureAdapter<R> extends AbstractQueuedSynchronizer imple } /** - * Removes listeners registered before. - * - * @param lsnrs Listeners to be removed. - */ - @Override public void stopListenAsync(GridClientFutureListener<R>... lsnrs) { - Collection<GridClientFutureListener<R>> lsnrsCol = lsnrs == null ? null : Arrays.asList(lsnrs); - - for (Iterator<DoneCallback> it = cbs.iterator(); it.hasNext();) { - DoneCallback cb = it.next(); - - if (cb.lsnr == null) - continue; - - // Remove all listeners, if passed listeners collection is 'null'. - if (lsnrsCol == null || lsnrsCol.contains(cb.lsnr)) - it.remove(); - } - } - - /** * Creates future's chain and completes chained future, when this future finishes. * * @param cb Future callback to convert this future result into expected format. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java index 44d10f0..a87e681 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java @@ -420,7 +420,7 @@ public class GridClientNioTcpConnection extends GridClientConnection { lastMsgSndTime = U.currentTimeMillis(); if (routeMode) { - sndFut.listenAsync(new CI1<GridNioFuture<?>>() { + sndFut.listen(new CI1<GridNioFuture<?>>() { @Override public void apply(GridNioFuture<?> sndFut) { try { sndFut.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java index dc0ccbc..0bf416c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java @@ -101,7 +101,7 @@ public abstract class GridTcpRouterNioListenerAdapter implements GridNioServerLi try { client.forwardMessage(routerMsg, routerMsg.destinationId(), ses.<Byte>meta(MARSHALLER_ID.ordinal())) - .listenAsync(new GridClientFutureListener() { + .listen(new GridClientFutureListener() { @Override public void onDone(GridClientFuture fut) { try { GridRouterResponse res = (GridRouterResponse)fut.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java index ce5431f..e5ea36c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java @@ -488,7 +488,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus if (cnt.decrementAndGet() == 0) comp.markInitialized(); - fut.listenAsync(new CI1<IgniteInternalFuture<GridTuple3<String, Boolean, String>>>() { + fut.listen(new CI1<IgniteInternalFuture<GridTuple3<String, Boolean, String>>>() { @Override public void apply(IgniteInternalFuture<GridTuple3<String, Boolean, String>> f) { runNextNodeCallable(queue, comp, cnt); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java index 1d0ef09..e1ce58d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java @@ -586,7 +586,7 @@ public class GridExecutorService implements ExecutorService, Externalizable { private <T> Future<T> addFuture(IgniteInternalFuture<T> fut) { synchronized (mux) { if (!fut.isDone()) { - fut.listenAsync(lsnr); + fut.listen(lsnr); futs.add(fut); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 4de52d9..b50472d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1971,7 +1971,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, }); if (statsEnabled) - fut.listenAsync(new UpdateGetTimeStatClosure<V>(metrics0(), start)); + fut.listen(new UpdateGetTimeStatClosure<V>(metrics0(), start)); return fut; } @@ -2013,7 +2013,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, }); if (statsEnabled) - fut.listenAsync(new UpdateGetTimeStatClosure<Map<K, V>>(metrics0(), start)); + fut.listen(new UpdateGetTimeStatClosure<Map<K, V>>(metrics0(), start)); return fut; } @@ -2429,7 +2429,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, IgniteInternalFuture<V> fut = putAsync(key, val, null, -1, filter); if (statsEnabled) - fut.listenAsync(new UpdatePutAndGetTimeStatClosure<V>(metrics0(), start)); + fut.listen(new UpdatePutAndGetTimeStatClosure<V>(metrics0(), start)); return fut; } @@ -2758,7 +2758,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, IgniteInternalFuture<Boolean> fut = putxAsync(key, val, null, -1, filter); if (statsEnabled) - fut.listenAsync(new UpdatePutTimeStatClosure<Boolean>(metrics0(), start)); + fut.listen(new UpdatePutTimeStatClosure<Boolean>(metrics0(), start)); return fut; } @@ -2837,7 +2837,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, })); if(statsEnabled) - fut.listenAsync(new UpdatePutTimeStatClosure<V>(metrics0(), start)); + fut.listen(new UpdatePutTimeStatClosure<V>(metrics0(), start)); return fut; } @@ -2900,7 +2900,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, }); if (statsEnabled) - fut.listenAsync(new UpdatePutTimeStatClosure<Boolean>(metrics0(), start)); + fut.listen(new UpdatePutTimeStatClosure<Boolean>(metrics0(), start)); return fut; } @@ -2954,7 +2954,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, })); if (statsEnabled) - fut.listenAsync(new UpdatePutAndGetTimeStatClosure<V>(metrics0(), start)); + fut.listen(new UpdatePutAndGetTimeStatClosure<V>(metrics0(), start)); return fut; } @@ -3082,7 +3082,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, }); if (statsEnabled) - fut.listenAsync(new UpdatePutAndGetTimeStatClosure<Boolean>(metrics0(), start)); + fut.listen(new UpdatePutAndGetTimeStatClosure<Boolean>(metrics0(), start)); return fut; } @@ -3192,7 +3192,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, IgniteInternalFuture<V> fut = removeAsync(key, null, filter); if (statsEnabled) - fut.listenAsync(new UpdateRemoveTimeStatClosure<V>(metrics0(), start)); + fut.listen(new UpdateRemoveTimeStatClosure<V>(metrics0(), start)); return fut; } @@ -3224,7 +3224,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, })); if (statsEnabled) - fut.listenAsync(new UpdateRemoveTimeStatClosure<V>(metrics0(), start)); + fut.listen(new UpdateRemoveTimeStatClosure<V>(metrics0(), start)); return fut; } @@ -3297,7 +3297,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, }); if (statsEnabled) - fut.listenAsync(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); + fut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); return fut; } @@ -3380,7 +3380,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, }); if (statsEnabled) - fut.listenAsync(new UpdateRemoveTimeStatClosure<Boolean>(metrics0(), start)); + fut.listen(new UpdateRemoveTimeStatClosure<Boolean>(metrics0(), start)); return fut; } @@ -3630,7 +3630,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, }); if (statsEnabled) - fut.listenAsync(new UpdateRemoveTimeStatClosure<Boolean>(metrics0(), start)); + fut.listen(new UpdateRemoveTimeStatClosure<Boolean>(metrics0(), start)); return fut; } @@ -4736,7 +4736,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, asyncOpRelease(); } else { - fut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { asyncOpRelease(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index 2652843..ad89768 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -1081,7 +1081,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V // Thread that prepares future should remove it and install listener. curEvictFut.compareAndSet(fut, null); - fut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { if (!busyLock.enterBusy()) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 8cd7c4b..277c48b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -118,7 +119,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V if (!topFut.isDone()) { final IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c0 = c; - topFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { + topFut.listen(new CI1<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> t) { onMessage0(nodeId, cacheMsg, c0); } @@ -215,36 +216,43 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']'); // Don't hold this thread waiting for preloading to complete. - startFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - rw.readLock(); - - try { - if (stopping) { - if (log.isDebugEnabled()) - log.debug("Received cache communication message while stopping " + - "(will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']'); - - return; + startFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(final IgniteInternalFuture<?> f) { + cctx.kernalContext().closure().runLocalSafe( + new GridPlainRunnable() { + @Override public void run() { + rw.readLock(); + + try { + if (stopping) { + if (log.isDebugEnabled()) + log.debug("Received cache communication message while stopping " + + "(will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']'); + + return; + } + + f.get(); + + if (log.isDebugEnabled()) + log.debug("Start future completed for message [nodeId=" + nodeId + + ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']'); + + processMessage(nodeId, cacheMsg, c); + } + catch (IgniteCheckedException e) { + // Log once. + if (startErr.compareAndSet(false, true)) + U.error(log, "Failed to complete preload start future " + + "(will ignore message) " + + "[fut=" + f + ", nodeId=" + nodeId + ", msg=" + cacheMsg + ']', e); + } + finally { + rw.readUnlock(); + } + } } - - f.get(); - - if (log.isDebugEnabled()) - log.debug("Start future completed for message [nodeId=" + nodeId + - ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']'); - - processMessage(nodeId, cacheMsg, c); - } - catch (IgniteCheckedException e) { - // Log once. - if (startErr.compareAndSet(false, true)) - U.error(log, "Failed to complete preload start future (will ignore message) " + - "[fut=" + f + ", nodeId=" + nodeId + ", msg=" + cacheMsg + ']', e); - } - finally { - rw.readUnlock(); - } + ); } }); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java index 7df184e..60921e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java @@ -73,7 +73,7 @@ public final class GridCacheMultiTxFuture<K, V> extends GridFutureAdapter<Boolea for (final IgniteInternalTx<K, V> tx : remainingTxs) { if (!tx.done()) { - tx.finishFuture().listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { + tx.finishFuture().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx> t) { remainingTxs.remove(tx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 3bc5f3e..55bc023 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -1010,7 +1010,7 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K, finishFuts.add(finishFut); - finishFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + finishFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> e) { finishFuts.remove(finishFut); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 2b672d9..f63d4a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -133,7 +133,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Discovery event (will start exchange): " + exchId); - locExchFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + locExchFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { if (!enterBusy()) return; @@ -141,7 +141,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana try { // Unwind in the order of discovery events. for (GridDhtPartitionsExchangeFuture<K, V> f = pendingExchangeFuts.poll(); f != null; - f = pendingExchangeFuts.poll()) + f = pendingExchangeFuts.poll()) addFuture(f); } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index b77b8db..d1439b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -761,7 +761,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting); if (completionLsnr != null) { - fut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { try { fut.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 10396d0..2dc191f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -180,7 +180,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter IgniteInternalFuture<?> rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST, new GlobalRemoveAllCallable<>(name(), topVer), nodes, true); - rmvFut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() { + rmvFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { try { fut.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 04ae561..e3debe8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -632,7 +632,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap expiryPlc, req.skipValues()); - fut.listenAsync(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>>>() { + fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>>>() { @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo<K, V>>> f) { GridNearGetResponse<K, V> res = new GridNearGetResponse<>(ctx.cacheId(), req.futureId(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 608678b..a79e80e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -447,9 +447,9 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { if (!txFut.isDone()) { final ReaderId<K, V> reader0 = reader; - txFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + txFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { synchronized (this) { // Release memory. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 7eda930..eb5fed7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -344,7 +344,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (keyFut == null || keyFut.isDone()) processDhtLockRequest0(nodeId, req); else { - keyFut.listenAsync(new CI1<IgniteInternalFuture<Object>>() { + keyFut.listen(new CI1<IgniteInternalFuture<Object>>() { @Override public void apply(IgniteInternalFuture<Object> t) { processDhtLockRequest0(nodeId, req); } @@ -520,7 +520,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // Register listener just so we print out errors. // Exclude lock timeout exception since it's not a fatal exception. - f.listenAsync(CU.errorLogger(log, GridCacheLockTimeoutException.class, + f.listen(CU.errorLogger(log, GridCacheLockTimeoutException.class, GridDistributedLockCancelledException.class)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 6aa159c..747686e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -509,7 +509,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements } } else - prep.listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { + prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx<K, V>> f) { try { f.get(); // Check for errors of a parent future. @@ -597,7 +597,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements else { prepFut.complete(); - prepFut.listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { + prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx<K, V>> f) { try { f.get(); // Check for errors of a parent future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 19b52c6..3429e4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -488,7 +488,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) { IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() == null ? tx.commitAsync() : tx.rollbackAsync(); - fut.listenAsync(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { + fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> gridCacheTxGridFuture) { try { if (replied.compareAndSet(false, true)) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 824bd40..8adf230 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -677,13 +677,13 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M // Need to wait for next topology version to remap. IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer); - topFut.listenAsync(new CIX1<IgniteInternalFuture<Long>>() { + topFut.listen(new CIX1<IgniteInternalFuture<Long>>() { @SuppressWarnings("unchecked") @Override public void applyx(IgniteInternalFuture<Long> fut) throws IgniteCheckedException { long topVer = fut.get(); // This will append new futures to compound list. - map(F.view(keys.keySet(), new P1<K>() { + map(F.view(keys.keySet(), new P1<K>() { @Override public boolean apply(K key) { return invalidParts.contains(cctx.affinity().partition(key)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index cd3eb59..c7670c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -864,7 +864,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskNameHash); if (statsEnabled) - updateFut.listenAsync(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); + updateFut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); return asyncOp(new CO<IgniteInternalFuture<Object>>() { @Override public IgniteInternalFuture<Object> apply() { @@ -1037,7 +1037,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (forceFut.isDone()) updateAllAsyncInternal0(nodeId, req, completionCb); else { - forceFut.listenAsync(new CI1<IgniteInternalFuture<Object>>() { + forceFut.listen(new CI1<IgniteInternalFuture<Object>>() { @Override public void apply(IgniteInternalFuture<Object> t) { updateAllAsyncInternal0(nodeId, req, completionCb); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index f099aa5..fcbc634 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -415,7 +415,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> snapshot = fut.topologySnapshot(); } else { - fut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { + fut.listen(new CI1<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> t) { mapOnTopology(keys, remap, oldNodeId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index af1ee1b..3d67e27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -562,7 +562,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity markInitialized(); } else { - fut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { + fut.listen(new CI1<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> t) { mapOnTopology(); } @@ -830,7 +830,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } } else { - txSync.listenAsync(new CI1<IgniteInternalFuture<?>>() { + txSync.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { try { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index feb6021..be6ec07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -208,7 +208,7 @@ public class GridDhtPartitionDemandPool<K, V> { if (log.isDebugEnabled()) log.debug("Forcing preload event for future: " + exchFut); - exchFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { + exchFut.listen(new CI1<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> t) { cctx.shared().exchange().forcePreloadExchange(exchFut); } @@ -357,7 +357,7 @@ public class GridDhtPartitionDemandPool<K, V> { obj = new GridTimeoutObjectAdapter(delay) { @Override public void onTimeout() { - exchFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { + exchFut.listen(new CI1<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> f) { cctx.shared().exchange().forcePreloadExchange(exchFut); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index b33d426..5e5555d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -707,7 +707,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon } } else { - initFut.listenAsync(new CI1<IgniteInternalFuture<Boolean>>() { + initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> t) { try { if (!t.get()) // Just to check if there was an error. @@ -805,7 +805,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon assert exchId.topologyVersion() == msg.topologyVersion(); - initFut.listenAsync(new CI1<IgniteInternalFuture<Boolean>>() { + initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> t) { assert msg.lastVersion() != null; @@ -865,7 +865,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon try { // Wait for initialization part of this future to complete. - initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + initFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { if (isDone()) return; @@ -945,7 +945,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon assert rmtNodes != null; - for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext();) + for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) if (it.next().id().equals(nodeId)) it.remove(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index c77580a..75a3ddc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -228,7 +229,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { if (cctx.config().getPreloadPartitionedDelay() >= 0) { U.log(log, "Starting preloading in " + cctx.config().getPreloadMode() + " mode: " + cctx.name()); - demandPool.syncFuture().listenAsync(new CI1<Object>() { + demandPool.syncFuture().listen(new CI1<Object>() { @Override public void apply(Object t) { U.log(log, "Completed preloading in " + cctx.config().getPreloadMode() + " mode " + "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]"); @@ -322,7 +323,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { if (fut.isDone()) processForceKeysRequest0(node, msg); else - fut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { processForceKeysRequest0(node, msg); } @@ -428,7 +429,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Processing affinity assignment request [node=" + node + ", req=" + req + ']'); - cctx.affinity().affinityReadyFuture(req.topologyVersion()).listenAsync(new CI1<IgniteInternalFuture<Long>>() { + cctx.affinity().affinityReadyFuture(req.topologyVersion()).listen(new CI1<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> fut) { if (log.isDebugEnabled()) log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer + @@ -497,10 +498,10 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { fut.init(); else { if (topReadyFut == null) - startFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + startFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> syncFut) { cctx.kernalContext().closure().runLocalSafe( - new Runnable() { + new GridPlainRunnable() { @Override public void run() { fut.init(); } @@ -515,7 +516,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { compound.markInitialized(); - compound.listenAsync(new CI1<IgniteInternalFuture<?>>() { + compound.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> syncFut) { fut.init(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index ed2565b..0435b92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -741,7 +741,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma final RemapTimeoutObject timeout = new RemapTimeoutObject( cctx.kernalContext().config().getNetworkTimeout(), topVer, e); - cctx.discovery().topologyFuture(topVer + 1).listenAsync(new CI1<IgniteInternalFuture<Long>>() { + cctx.discovery().topologyFuture(topVer + 1).listen(new CI1<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> longIgniteFuture) { if (timeout.finish()) { cctx.kernalContext().timeout().removeTimeoutObject(timeout); @@ -793,7 +793,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma // Need to wait for next topology version to remap. IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer); - topFut.listenAsync(new CIX1<IgniteInternalFuture<Long>>() { + topFut.listen(new CIX1<IgniteInternalFuture<Long>>() { @Override public void applyx(IgniteInternalFuture<Long> fut) throws IgniteCheckedException { long readyTopVer = fut.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 855b96c..88095d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -690,7 +690,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B markInitialized(); } else { - fut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { + fut.listen(new CI1<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> t) { mapOnTopology(); } @@ -1108,7 +1108,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B } } else { - txSync.listenAsync(new CI1<IgniteInternalFuture<?>>() { + txSync.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { try { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 8f07892..9ada718 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -29,7 +29,6 @@ import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -762,7 +761,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { IgniteInternalFuture<IgniteInternalTx<K, V>> prepareFut = prepFut.get(); - prepareFut.listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { + prepareFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx<K, V>> f) { GridNearTxFinishFuture<K, V> fut0 = commitFut.get(); @@ -831,7 +830,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { } } else { - prepFut.listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { + prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx<K, V>> f) { try { // Check for errors in prepare future. @@ -994,7 +993,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { } } else - prep.listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { + prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx<K, V>> f) { try { f.get(); // Check for errors of a parent future. @@ -1047,7 +1046,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { fut.finish(); } else - prep.listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { + prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx<K, V>> f) { try { f.get(); // Check for errors of a parent future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index a430e48..16a74c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -352,7 +352,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut } } else { - topFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { + topFut.listen(new CI1<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> t) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 5988069..6e0a85f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -927,7 +927,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { }); if (statsEnabled) - fut.listenAsync(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); + fut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); return fut; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index c9ef668..850900a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -522,7 +522,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage cctx.io().addOrderedHandler(topic, resHnd); - fut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { cctx.io().removeOrderedHandler(topic); } @@ -631,7 +631,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage cctx.io().addOrderedHandler(topic, resHnd); - fut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { cctx.io().removeOrderedHandler(topic); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index b4fbef5..a540d99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -102,8 +102,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte for (Map.Entry<Long, GridFutureAdapter<QueryResult<K, V>>> entry : futs.entrySet()) { final Object recipient = recipient(nodeId, entry.getKey()); - entry.getValue().listenAsync(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() { - @Override public void applyx(IgniteInternalFuture<QueryResult<K, V>> f) throws IgniteCheckedException { + entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() { + @Override + public void applyx(IgniteInternalFuture<QueryResult<K, V>> f) throws IgniteCheckedException { f.get().closeIfNotShared(recipient); } }); @@ -116,7 +117,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte for (Map.Entry<Long, GridFutureAdapter<FieldsResult>> entry : fieldsFuts.entrySet()) { final Object recipient = recipient(nodeId, entry.getKey()); - entry.getValue().listenAsync(new CIX1<IgniteInternalFuture<FieldsResult>>() { + entry.getValue().listen(new CIX1<IgniteInternalFuture<FieldsResult>>() { @Override public void applyx(IgniteInternalFuture<FieldsResult> f) throws IgniteCheckedException { f.get().closeIfNotShared(recipient); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index dd48ca2..b69c6d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -325,7 +325,7 @@ public class IgniteTxHandler<K, V> { final GridDhtTxLocal<K, V> tx0 = tx; - fut.listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { + fut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx<K, V>> txFut) { try { txFut.get(); @@ -573,7 +573,7 @@ public class IgniteTxHandler<K, V> { IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitAsync(); // Only for error logging. - commitFut.listenAsync(CU.errorLogger(log)); + commitFut.listen(CU.errorLogger(log)); return commitFut; } @@ -589,7 +589,7 @@ public class IgniteTxHandler<K, V> { IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync(); // Only for error logging. - rollbackFut.listenAsync(CU.errorLogger(log)); + rollbackFut.listen(CU.errorLogger(log)); return rollbackFut; } @@ -601,7 +601,7 @@ public class IgniteTxHandler<K, V> { IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync(); // Only for error logging. - rollbackFut.listenAsync(CU.errorLogger(log)); + rollbackFut.listen(CU.errorLogger(log)); return rollbackFut; } @@ -756,7 +756,7 @@ public class IgniteTxHandler<K, V> { finish(nodeId, nearTx, req); if (dhtTx != null && !dhtTx.done()) { - dhtTx.finishFuture().listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { + dhtTx.finishFuture().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) { sendReply(nodeId, req); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index a7de806..065d545 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1867,7 +1867,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { } if (commit) - tx.commitAsync().listenAsync(new CommitListener(tx)); + tx.commitAsync().listen(new CommitListener(tx)); else tx.rollbackAsync(); } @@ -1922,7 +1922,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { tx.writeMap().put(entry.txKey(), entry); } - tx.commitAsync().listenAsync(new CommitListener(tx)); + tx.commitAsync().listen(new CommitListener(tx)); } else tx.rollbackAsync(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java index d470d02..36a5cca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java @@ -151,7 +151,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { ldrs.add(ldr); - ldr.internalFuture().listenAsync(new CI1<IgniteInternalFuture<?>>() { + ldr.internalFuture().listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { boolean b = ldrs.remove(ldr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index 81de12d..90f519d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -380,7 +380,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay try { GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(); - resFut.listenAsync(rmvActiveFut); + resFut.listen(rmvActiveFut); activeFuts.add(resFut); @@ -851,7 +851,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay entries = newEntries(); curFut = new GridFutureAdapter<>(); - curFut.listenAsync(signalC); + curFut.listen(signalC); sem = new Semaphore(parallelOps); } @@ -870,7 +870,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay synchronized (this) { curFut0 = curFut; - curFut0.listenAsync(lsnr); + curFut0.listen(lsnr); for (Map.Entry<K, V> entry : newEntries) entries.add(entry); @@ -880,7 +880,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay entries = newEntries(); curFut = new GridFutureAdapter<>(); - curFut.listenAsync(signalC); + curFut.listen(signalC); } } @@ -917,7 +917,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay entries = newEntries(); curFut = new GridFutureAdapter<>(); - curFut.listenAsync(signalC); + curFut.listen(signalC); } } @@ -986,7 +986,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay locFuts.add(fut); - fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() { + fut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() { @Override public void apply(IgniteInternalFuture<Object> t) { try { boolean rmv = locFuts.remove(t); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index f3b8773..fb953f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -1046,14 +1046,15 @@ public class IgfsDataManager extends IgfsManager { @Override @Nullable public Object call() throws Exception { - storeBlocksAsync(blocks).listenAsync(new CI1<IgniteInternalFuture<?>>() { + storeBlocksAsync(blocks).listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { try { fut.get(); completionFut.onWriteAck(nodeId, batchId); - } catch (IgniteCheckedException e) { + } + catch (IgniteCheckedException e) { completionFut.onError(nodeId, e); } } @@ -1277,7 +1278,7 @@ public class IgfsDataManager extends IgfsManager { * @param blocksMsg Write request message. */ private void processBlocksMessage(final UUID nodeId, final IgfsBlocksMessage blocksMsg) { - storeBlocksAsync(blocksMsg.blocks()).listenAsync(new CI1<IgniteInternalFuture<?>>() { + storeBlocksAsync(blocksMsg.blocks()).listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { IgniteCheckedException err = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java index 5afa523..b72f824 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java @@ -494,7 +494,7 @@ public class IgfsInputStreamImpl extends IgfsInputStreamAdapter { if (!evictFut.isDone()) { pendingFuts.add(evictFut); - evictFut.listenAsync(new IgniteInClosure<IgniteInternalFuture<byte[]>>() { + evictFut.listen(new IgniteInClosure<IgniteInternalFuture<byte[]>>() { @Override public void apply(IgniteInternalFuture<byte[]> t) { pendingFuts.remove(evictFut); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index adc0254..d2b94db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -1686,7 +1686,7 @@ public class IgfsMetaManager extends IgfsManager { // Record PURGE event if needed. if (evts.isRecordable(EVT_IGFS_FILE_PURGED)) { - delFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + delFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { try { t.get(); // Ensure delete succeeded. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java index 870ced0..72976d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.ipc.*; import org.apache.ignite.internal.util.ipc.loopback.*; import org.apache.ignite.internal.util.ipc.shmem.*; @@ -288,7 +287,7 @@ public class IgfsServer { } } else { - fut.listenAsync(new CIX1<IgniteInternalFuture<IgfsMessage>>() { + fut.listen(new CIX1<IgniteInternalFuture<IgfsMessage>>() { @Override public void applyx(IgniteInternalFuture<IgfsMessage> fut) { IgfsMessage res; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java index bb34e94..850a096 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java @@ -103,7 +103,7 @@ public class GridRestProcessor extends GridProcessorAdapter { try { IgniteInternalFuture<GridRestResponse> res = handleRequest(req); - res.listenAsync(new IgniteInClosure<IgniteInternalFuture<GridRestResponse>>() { + res.listen(new IgniteInClosure<IgniteInternalFuture<GridRestResponse>>() { @Override public void apply(IgniteInternalFuture<GridRestResponse> f) { try { fut.onDone(f.get()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java index 87f0fc3..0da83bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java @@ -224,7 +224,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter { fut.onDone(res); } - taskFut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() { + taskFut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() { @Override public void apply(IgniteInternalFuture<Object> taskFut) { try { TaskDescriptor desc;