# ignite-26
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f4ca6583 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f4ca6583 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f4ca6583 Branch: refs/heads/ignite-26 Commit: f4ca65834259a9a18ac4f62afacac531ffb4cdcc Parents: aad2750 Author: sboikov <sboi...@gridgain.com> Authored: Thu Jan 29 18:32:07 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jan 29 18:32:07 2015 +0300 ---------------------------------------------------------------------- .../internal/TaskEventSubjectIdSelfTest.java | 3 +- .../ignite/compute/ComputeTaskFuture.java | 4 +- .../compute/ComputeTaskTimeoutException.java | 41 +++++++++++++++- .../aop/GridifySetToSetAbstractAspect.java | 4 +- .../aop/GridifySetToValueAbstractAspect.java | 4 +- .../ignite/internal/IgniteComputeImpl.java | 7 +++ .../internal/executor/GridExecutorService.java | 26 ++++++---- .../processors/cache/GridCacheAdapter.java | 8 ++-- .../processors/cache/IgniteCacheProxy.java | 50 ++++++++++++-------- .../processors/task/GridTaskProcessor.java | 16 +++---- .../processors/task/GridTaskWorker.java | 9 ++-- .../ignite/internal/util/IgniteUtils.java | 3 ++ .../internal/util/future/IgniteFutureImpl.java | 7 +++ .../apache/ignite/lang/IgniteAsyncSupport.java | 4 +- .../ignite/lang/IgniteAsyncSupportAdapter.java | 23 +++++++-- .../GridEventStorageCheckAllEventsSelfTest.java | 3 +- .../GridJobCollisionCancelSelfTest.java | 3 +- .../internal/GridProjectionAbstractTest.java | 20 ++++---- .../internal/GridTaskTimeoutSelfTest.java | 5 +- .../closure/GridClosureProcessorSelfTest.java | 39 +++++++-------- 20 files changed, 182 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/clients/src/test/java/org/apache/ignite/internal/TaskEventSubjectIdSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/TaskEventSubjectIdSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/TaskEventSubjectIdSelfTest.java index e048846..17ed842 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/TaskEventSubjectIdSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/TaskEventSubjectIdSelfTest.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.compute.*; import org.apache.ignite.lang.*; import org.apache.ignite.client.*; import org.apache.ignite.testframework.*; @@ -200,7 +201,7 @@ public class TaskEventSubjectIdSelfTest extends GridCommonAbstractTest { return null; } }, - ComputeTaskTimeoutException.class, + ComputeTaskTimeoutCheckedException.class, null ); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskFuture.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskFuture.java index d5401fa..9b6bf4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskFuture.java @@ -32,14 +32,14 @@ public interface ComputeTaskFuture<R> extends IgniteFuture<R> { /** * {@inheritDoc} * - * @throws ComputeTaskTimeoutException If task execution timed out. + * @throws org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException If task execution timed out. */ @Override public R get(); /** * {@inheritDoc} * - * @throws ComputeTaskTimeoutException If task execution timed out. + * @throws org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException If task execution timed out. */ @Override public R get(long timeout, TimeUnit unit); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskTimeoutException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskTimeoutException.java index 9739227..805b44e 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskTimeoutException.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskTimeoutException.java @@ -17,8 +17,45 @@ package org.apache.ignite.compute; +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + /** - * TODO + * This exception indicates that task execution timed out. It is thrown from + * {@link org.apache.ignite.compute.ComputeTaskFuture#get()} method. */ -public class ComputeTaskTimeoutException { +public class ComputeTaskTimeoutException extends IgniteException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates task timeout exception with given task execution ID and + * error message. + * + * @param msg Error message. + */ + public ComputeTaskTimeoutException(String msg) { + super(msg); + } + + /** + * Creates new task timeout exception given throwable as a cause and + * source of error message. + * + * @param cause Non-null throwable cause. + */ + public ComputeTaskTimeoutException(Throwable cause) { + this(cause.getMessage(), cause); + } + + /** + * Creates task timeout exception with given task execution ID, + * error message and optional nested exception. + * + * @param msg Error message. + * @param cause Optional nested exception (can be {@code null}). + */ + public ComputeTaskTimeoutException(String msg, @Nullable Throwable cause) { + super(msg, cause); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/compute/gridify/aop/GridifySetToSetAbstractAspect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/gridify/aop/GridifySetToSetAbstractAspect.java b/modules/core/src/main/java/org/apache/ignite/compute/gridify/aop/GridifySetToSetAbstractAspect.java index 00189bb..0f293de 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/gridify/aop/GridifySetToSetAbstractAspect.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/gridify/aop/GridifySetToSetAbstractAspect.java @@ -18,8 +18,8 @@ package org.apache.ignite.compute.gridify.aop; import org.apache.ignite.*; -import org.apache.ignite.compute.*; import org.apache.ignite.compute.gridify.*; +import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.gridify.*; import java.lang.reflect.*; @@ -136,7 +136,7 @@ public class GridifySetToSetAbstractAspect { end = Long.MAX_VALUE; if (now > end) - throw new ComputeTaskTimeoutException("Timeout occurred while waiting for completion."); + throw new ComputeTaskTimeoutCheckedException("Timeout occurred while waiting for completion."); Collection<?> res = compute.withTimeout(timeout == 0 ? 0L : (end - now)).execute( new GridifyDefaultRangeTask(cls, nodeFilter, threshold, splitSize, false), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/compute/gridify/aop/GridifySetToValueAbstractAspect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/gridify/aop/GridifySetToValueAbstractAspect.java b/modules/core/src/main/java/org/apache/ignite/compute/gridify/aop/GridifySetToValueAbstractAspect.java index 447e88b..46d9aa0 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/gridify/aop/GridifySetToValueAbstractAspect.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/gridify/aop/GridifySetToValueAbstractAspect.java @@ -18,8 +18,8 @@ package org.apache.ignite.compute.gridify.aop; import org.apache.ignite.*; -import org.apache.ignite.compute.*; import org.apache.ignite.compute.gridify.*; +import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.gridify.*; import java.lang.reflect.*; @@ -134,7 +134,7 @@ public class GridifySetToValueAbstractAspect { while (true) { if (now > end) - throw new ComputeTaskTimeoutException("Timeout occurred while waiting for completion."); + throw new ComputeTaskTimeoutCheckedException("Timeout occurred while waiting for completion."); GridifyRangeArgument taskArg = createGridifyArgument(arg, res); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java index c8635ef..913dee9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java @@ -507,6 +507,13 @@ public class IgniteComputeImpl extends IgniteAsyncSupportAdapter<IgniteCompute> } /** {@inheritDoc} */ + @Override protected <R> IgniteFuture<R> createFuture(IgniteInternalFuture<R> fut) { + assert fut instanceof ComputeTaskInternalFuture : fut; + + return ((ComputeTaskInternalFuture<R>)fut).publicFuture(); + } + + /** {@inheritDoc} */ @Override public <R> ComputeTaskFuture<R> future() { return (ComputeTaskFuture<R>)super.future(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java index 0074585..9f509b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java @@ -18,8 +18,8 @@ package org.apache.ignite.internal.executor; import org.apache.ignite.*; -import org.apache.ignite.compute.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.compute.*; import org.apache.ignite.lang.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; @@ -213,7 +213,7 @@ public class GridExecutorService implements ExecutorService, Externalizable { try { fut.get(end - now); } - catch (ComputeTaskTimeoutException e) { + catch (ComputeTaskTimeoutCheckedException e) { U.error(log, "Failed to get task result: " + fut, e); return false; @@ -242,9 +242,11 @@ public class GridExecutorService implements ExecutorService, Externalizable { try { comp.call(task); - return addFuture(comp.<T>future()); + IgniteFutureImpl<T> fut = (IgniteFutureImpl<T>)comp.future(); + + return addFuture(fut.internalFuture()); } - catch (IgniteCheckedException e) { + catch (IgniteException e) { // Should not be thrown since uses asynchronous execution. return addFuture(new GridFinishedFutureEx<T>(e)); } @@ -261,7 +263,9 @@ public class GridExecutorService implements ExecutorService, Externalizable { try { comp.run(task); - IgniteInternalFuture<T> fut = comp.future().chain(new CX1<IgniteInternalFuture<?>, T>() { + IgniteInternalFuture<T> fut0 = ((IgniteFutureImpl<T>)comp.future()).internalFuture(); + + IgniteInternalFuture<T> fut = fut0.chain(new CX1<IgniteInternalFuture<?>, T>() { @Override public T applyx(IgniteInternalFuture<?> fut) throws IgniteCheckedException { fut.get(); @@ -271,7 +275,7 @@ public class GridExecutorService implements ExecutorService, Externalizable { return addFuture(fut); } - catch (IgniteCheckedException e) { + catch (IgniteException e) { // Should not be thrown since uses asynchronous execution. return addFuture(new GridFinishedFutureEx<T>(e)); } @@ -288,9 +292,11 @@ public class GridExecutorService implements ExecutorService, Externalizable { try { comp.run(task); - return addFuture(comp.future()); + IgniteFutureImpl<?> fut = (IgniteFutureImpl<?>)comp.future(); + + return addFuture(fut.internalFuture()); } - catch (IgniteCheckedException e) { + catch (IgniteException e) { // Should not be thrown since uses asynchronous execution. return addFuture(new GridFinishedFutureEx<>(e)); } @@ -376,7 +382,7 @@ public class GridExecutorService implements ExecutorService, Externalizable { try { fut.get(end - now); } - catch (ComputeTaskTimeoutException ignore) { + catch (ComputeTaskTimeoutCheckedException ignore) { if (log.isDebugEnabled()) log.debug("Timeout occurred during getting task result: " + fut); @@ -715,7 +721,7 @@ public class GridExecutorService implements ExecutorService, Externalizable { throw e2; } - catch (ComputeTaskTimeoutException e) { + catch (ComputeTaskTimeoutCheckedException e) { throw new ExecutionException("Task execution timed out during waiting for task result: " + fut, e); } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 01c22b5..f84fef6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -23,11 +23,11 @@ import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; import org.apache.ignite.fs.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; @@ -1314,7 +1314,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (log.isDebugEnabled()) log.debug("All remote nodes left while cache clear [cacheName=" + name() + "]"); } - catch (ComputeTaskTimeoutException e) { + catch (ComputeTaskTimeoutCheckedException e) { U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " + "'networkTimeout' configuration property) [cacheName=" + name() + "]"); @@ -3832,7 +3832,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param args Arguments. * @throws IgniteCheckedException If failed. */ - IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) + IgniteFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws IgniteCheckedException { ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name()); @@ -4170,7 +4170,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return primaryOnly ? primarySize() : size(); } - catch (ComputeTaskTimeoutException e) { + catch (ComputeTaskTimeoutCheckedException e) { U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " + "'networkTimeout' configuration property) [cacheName=" + name() + "]"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index dd7b781..4560f4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.lang.*; import org.apache.ignite.mxbean.*; import org.apache.ignite.internal.util.tostring.*; @@ -173,7 +174,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) - curFut.set(delegate.<K, V>cache().loadCacheAsync(p, 0, args)); + setFuture(delegate.<K, V>cache().loadCacheAsync(p, 0, args)); else delegate.<K, V>cache().loadCache(p, 0, args); } @@ -193,7 +194,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.putIfAbsentAsync(key, val)); + setFuture(delegate.putIfAbsentAsync(key, val)); return null; } @@ -333,7 +334,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.getAsync(key)); + setFuture(delegate.getAsync(key)); return null; } @@ -356,7 +357,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.getAllAsync(keys)); + setFuture(delegate.getAllAsync(keys)); return null; } @@ -382,7 +383,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.getAllAsync(keys)); + setFuture(delegate.getAllAsync(keys)); return null; } @@ -438,7 +439,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.containsKeyAsync(key)); + setFuture(delegate.containsKeyAsync(key)); return false; } @@ -488,7 +489,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) - curFut.set(delegate.putxAsync(key, val)); + setFuture(delegate.putxAsync(key, val)); else delegate.putx(key, val); } @@ -508,7 +509,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.putAsync(key, val)); + setFuture(delegate.putAsync(key, val)); return null; } @@ -531,7 +532,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) - curFut.set(delegate.putAllAsync(map)); + setFuture(delegate.putAllAsync(map)); else delegate.putAll(map); } @@ -551,7 +552,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.putxIfAbsentAsync(key, val)); + setFuture(delegate.putxIfAbsentAsync(key, val)); return false; } @@ -574,7 +575,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.removexAsync(key)); + setFuture(delegate.removexAsync(key)); return false; } @@ -597,7 +598,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.removeAsync(key, oldVal)); + setFuture(delegate.removeAsync(key, oldVal)); return false; } @@ -620,7 +621,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.removeAsync(key)); + setFuture(delegate.removeAsync(key)); return null; } @@ -643,7 +644,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.replaceAsync(key, oldVal, newVal)); + setFuture(delegate.replaceAsync(key, oldVal, newVal)); return false; } @@ -666,7 +667,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.replacexAsync(key, val)); + setFuture(delegate.replacexAsync(key, val)); return false; } @@ -689,7 +690,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.replaceAsync(key, val)); + setFuture(delegate.replaceAsync(key, val)); return null; } @@ -712,7 +713,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) - curFut.set(delegate.removeAllAsync(keys)); + setFuture(delegate.removeAllAsync(keys)); else delegate.removeAll(keys); } @@ -734,7 +735,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) - curFut.set(delegate.removeAllAsync(keys)); + setFuture(delegate.removeAllAsync(keys)); else delegate.removeAll(keys); } @@ -798,7 +799,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach } }); - curFut.set(fut0); + setFuture(fut0); return null; } @@ -826,7 +827,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.invokeAllAsync(keys, entryProcessor, args)); + setFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); return null; } @@ -851,7 +852,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) { - curFut.set(delegate.invokeAllAsync(map, args)); + setFuture(delegate.invokeAllAsync(map, args)); return null; } @@ -1074,6 +1075,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach return new CacheException(e); } + /** + * @param fut Future for async operation. + */ + private <R> void setFuture(IgniteInternalFuture<R> fut) { + curFut.set(new IgniteFutureImpl<>(fut)); + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index 83abea6..ad7808f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -278,7 +278,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { * @param <T> Task argument type. * @param <R> Task return value type. */ - public <T, R> GridTaskFutureImpl<R> execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) { + public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) { assert taskCls != null; lock.readLock(); @@ -301,7 +301,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { * @param <T> Task argument type. * @param <R> Task return value type. */ - public <T, R> GridTaskFutureImpl<R> execute(ComputeTask<T, R> task, @Nullable T arg) { + public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg) { return execute(task, arg, false); } @@ -313,7 +313,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { * @param <T> Task argument type. * @param <R> Task return value type. */ - public <T, R> GridTaskFutureImpl<R> execute(ComputeTask<T, R> task, @Nullable T arg, boolean sys) { + public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, boolean sys) { lock.readLock(); try { @@ -349,7 +349,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { * @param <T> Task argument type. * @param <R> Task return value type. */ - public <T, R> GridTaskFutureImpl<R> execute(String taskName, @Nullable T arg) { + public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, @Nullable T arg) { assert taskName != null; lock.readLock(); @@ -375,7 +375,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { * @return Task future. */ @SuppressWarnings("unchecked") - private <T, R> GridTaskFutureImpl<R> startTask( + private <T, R> ComputeTaskInternalFuture<R> startTask( @Nullable String taskName, @Nullable Class<?> taskCls, @Nullable ComputeTask<T, R> task, @@ -535,7 +535,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { fullSup, subjId); - GridTaskFutureImpl<R> fut = new GridTaskFutureImpl<>(ses, ctx); + ComputeTaskInternalFuture<R> fut = new ComputeTaskInternalFuture<>(ses, ctx); IgniteCheckedException securityEx = null; @@ -631,7 +631,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { Map<IgniteUuid, ComputeTaskFuture<R>> res = U.newHashMap(tasks.size()); for (GridTaskWorker taskWorker : tasks.values()) { - GridTaskFutureImpl<R> fut = taskWorker.getTaskFuture(); + ComputeTaskInternalFuture<R> fut = taskWorker.getTaskFuture(); res.put(fut.getTaskSession().getId(), fut.publicFuture()); } @@ -719,7 +719,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { * @param fut Task future. * @param <R> Result type. */ - private <R> void handleException(Throwable ex, GridTaskFutureImpl<R> fut) { + private <R> void handleException(Throwable ex, ComputeTaskInternalFuture<R> fut) { assert ex != null; assert fut != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index fc93da0..bc0160a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -24,6 +24,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.fs.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.compute.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.resources.*; @@ -89,7 +90,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { private final GridTaskSessionImpl ses; /** */ - private final GridTaskFutureImpl<R> fut; + private final ComputeTaskInternalFuture<R> fut; /** */ private final T arg; @@ -194,7 +195,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { GridKernalContext ctx, @Nullable T arg, GridTaskSessionImpl ses, - GridTaskFutureImpl<R> fut, + ComputeTaskInternalFuture<R> fut, @Nullable Class<?> taskCls, @Nullable ComputeTask<T, R> task, GridDeployment dep, @@ -258,7 +259,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { /** * @return Task future. */ - GridTaskFutureImpl<R> getTaskFuture() { + ComputeTaskInternalFuture<R> getTaskFuture() { return fut; } @@ -308,7 +309,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { recordTaskEvent(EVT_TASK_TIMEDOUT, "Task has timed out."); - Throwable e = new ComputeTaskTimeoutException("Task timed out (check logs for error messages): " + ses); + Throwable e = new ComputeTaskTimeoutCheckedException("Task timed out (check logs for error messages): " + ses); finishTask(null, e); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 8aacd6b..fdb769d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -25,6 +25,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.mxbean.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -9157,6 +9158,8 @@ public abstract class IgniteUtils { return new ClusterTopologyException(e.getMessage(), e.getCause()); else if (e instanceof IgniteDeploymentCheckedException) return new IgniteDeploymentException(e.getMessage(), e.getCause()); + else if (e instanceof ComputeTaskTimeoutCheckedException) + return new ComputeTaskTimeoutException(e.getMessage(), e.getCause()); else if (e.getCause() instanceof IgniteException) return (IgniteException)e.getCause(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java index e78e8df..8c24cba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java @@ -43,6 +43,13 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> { this.fut = fut; } + /** + * @return Internal future. + */ + public IgniteInternalFuture<V> internalFuture() { + return fut; + } + /** {@inheritDoc} */ @Override public long startTime() { return fut.startTime(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java index 23226a1..4540e8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java @@ -17,8 +17,6 @@ package org.apache.ignite.lang; -import org.apache.ignite.internal.*; - /** * TODO: Add interface description. */ @@ -40,5 +38,5 @@ public interface IgniteAsyncSupport { * * @return Future for previous asynchronous operation. */ - public <R> IgniteInternalFuture<R> future(); + public <R> IgniteFuture<R> future(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java index 3d8e14c..e9fd493 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java @@ -19,13 +19,14 @@ package org.apache.ignite.lang; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.future.*; /** * Adapter for {@link IgniteAsyncSupport}. */ public class IgniteAsyncSupportAdapter<T extends IgniteAsyncSupport> implements IgniteAsyncSupport { /** Future for previous asynchronous operation. */ - protected ThreadLocal<IgniteInternalFuture<?>> curFut; + protected ThreadLocal<IgniteFuture<?>> curFut; /** * Default constructor. @@ -43,6 +44,7 @@ public class IgniteAsyncSupportAdapter<T extends IgniteAsyncSupport> implements } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public T withAsync() { if (isAsync()) return (T)this; @@ -52,6 +54,8 @@ public class IgniteAsyncSupportAdapter<T extends IgniteAsyncSupport> implements /** * Creates component with asynchronous mode enabled. + * + * @return Component with asynchronous mode enabled. */ protected T createAsyncInstance() { throw new UnsupportedOperationException(); @@ -63,18 +67,19 @@ public class IgniteAsyncSupportAdapter<T extends IgniteAsyncSupport> implements } /** {@inheritDoc} */ - @Override public <R> IgniteInternalFuture<R> future() { + @SuppressWarnings("unchecked") + @Override public <R> IgniteFuture<R> future() { if (curFut == null) throw new IllegalStateException("Asynchronous mode is disabled."); - IgniteInternalFuture<?> fut = curFut.get(); + IgniteFuture<?> fut = curFut.get(); if (fut == null) throw new IllegalStateException("Asynchronous operation not started."); curFut.set(null); - return (IgniteInternalFuture<R>)fut; + return (IgniteFuture<R>)fut; } /** @@ -85,11 +90,19 @@ public class IgniteAsyncSupportAdapter<T extends IgniteAsyncSupport> implements */ public <R> R saveOrGet(IgniteInternalFuture<R> fut) throws IgniteCheckedException { if (curFut != null) { - curFut.set(fut); + curFut.set(createFuture(fut)); return null; } else return fut.get(); } + + /** + * @param fut Internal future. + * @return Public API future. + */ + protected <R> IgniteFuture<R> createFuture(IgniteInternalFuture<R> fut) { + return new IgniteFutureImpl<>(fut); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java index 2c0b1dd..97d42b4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; @@ -189,7 +190,7 @@ public class GridEventStorageCheckAllEventsSelfTest extends GridCommonAbstractTe assert false : "Task should fail."; } - catch (ComputeTaskTimeoutException e) { + catch (ComputeTaskTimeoutCheckedException e) { info("Expected timeout exception caught [taskFuture=" + fut + ", exception=" + e + ']'); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/test/java/org/apache/ignite/internal/GridJobCollisionCancelSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobCollisionCancelSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobCollisionCancelSelfTest.java index bf967a6..1bbd671 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobCollisionCancelSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobCollisionCancelSelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.compute.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.collision.*; @@ -90,7 +91,7 @@ public class GridJobCollisionCancelSelfTest extends GridCommonAbstractTest { "Invalid collision resolution count: " + colResolutionCnt; } } - catch (ComputeTaskTimeoutException e) { + catch (ComputeTaskTimeoutCheckedException e) { error("Task execution got timed out.", e); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java index 8350f81..2879e51 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java @@ -327,7 +327,7 @@ public abstract class GridProjectionAbstractTest extends GridCommonAbstractTest comp.broadcast(runJob); - IgniteInternalFuture fut = comp.future(); + IgniteFuture fut = comp.future(); waitForExecution(fut); @@ -349,7 +349,7 @@ public abstract class GridProjectionAbstractTest extends GridCommonAbstractTest comp.run(jobs); - IgniteInternalFuture fut = comp.future(); + IgniteFuture fut = comp.future(); waitForExecution(fut); @@ -369,7 +369,7 @@ public abstract class GridProjectionAbstractTest extends GridCommonAbstractTest comp.broadcast(calJob); - IgniteInternalFuture fut = comp.future(); + IgniteFuture fut = comp.future(); waitForExecution(fut); @@ -391,7 +391,7 @@ public abstract class GridProjectionAbstractTest extends GridCommonAbstractTest comp.call(jobs); - IgniteInternalFuture fut = comp.future(); + IgniteFuture fut = comp.future(); waitForExecution(fut); @@ -411,7 +411,7 @@ public abstract class GridProjectionAbstractTest extends GridCommonAbstractTest comp.apply(clrJob, (String) null); - IgniteInternalFuture fut = comp.future(); + IgniteFuture fut = comp.future(); waitForExecution(fut); @@ -433,7 +433,7 @@ public abstract class GridProjectionAbstractTest extends GridCommonAbstractTest comp.apply(clrJob, args); - IgniteInternalFuture fut = comp.future(); + IgniteFuture fut = comp.future(); waitForExecution(fut); @@ -453,7 +453,7 @@ public abstract class GridProjectionAbstractTest extends GridCommonAbstractTest comp.broadcast(new TestClosure(), "arg"); - IgniteInternalFuture<Collection<String>> fut = comp.future(); + IgniteFuture <Collection<String>> fut = comp.future(); waitForExecution(fut); @@ -480,7 +480,7 @@ public abstract class GridProjectionAbstractTest extends GridCommonAbstractTest comp.apply(clrJob, args, rdc); - IgniteInternalFuture fut = comp.future(); + IgniteFuture fut = comp.future(); waitForExecution(fut); @@ -502,7 +502,7 @@ public abstract class GridProjectionAbstractTest extends GridCommonAbstractTest comp.call(jobs, rdc); - IgniteInternalFuture fut = comp.future(); + IgniteFuture fut = comp.future(); waitForExecution(fut); @@ -602,7 +602,7 @@ public abstract class GridProjectionAbstractTest extends GridCommonAbstractTest * @throws InterruptedException Thrown if wait was interrupted. */ @SuppressWarnings({"UnconditionalWait"}) - private void waitForExecution(IgniteInternalFuture fut) throws InterruptedException { + private void waitForExecution(IgniteFuture fut) throws InterruptedException { long sleep = 250; long threshold = System.currentTimeMillis() + WAIT_TIMEOUT; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/test/java/org/apache/ignite/internal/GridTaskTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskTimeoutSelfTest.java index 9c8d5a4..7bffa95 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskTimeoutSelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.compute.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.apache.ignite.internal.util.typedef.*; @@ -85,7 +86,7 @@ public class GridTaskTimeoutSelfTest extends GridCommonAbstractTest { assert false : "GridComputeTaskTimeoutException was not thrown (synchronous apply)"; } - catch (ComputeTaskTimeoutException e) { + catch (ComputeTaskTimeoutCheckedException e) { info("Received expected timeout exception (synchronous apply): " + e); } @@ -150,7 +151,7 @@ public class GridTaskTimeoutSelfTest extends GridCommonAbstractTest { assert false : "Task has not been timed out. Future: " + fut; } - catch (ComputeTaskTimeoutException ignored) { + catch (ComputeTaskTimeoutCheckedException ignored) { // Expected. } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4ca6583/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java index f5e8e26..a9c5066 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java @@ -21,7 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; +import org.apache.ignite.internal.compute.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.resources.*; @@ -190,7 +190,7 @@ public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { * @return Future object. * @throws IgniteCheckedException If failed. */ - private IgniteInternalFuture<?> runAsync(int idx, Runnable job, @Nullable IgnitePredicate<ClusterNode> p) + private ComputeTaskFuture<?> runAsync(int idx, Runnable job, @Nullable IgnitePredicate<ClusterNode> p) throws IgniteCheckedException { assert idx >= 0 && idx < NODES_CNT; assert job != null; @@ -213,7 +213,7 @@ public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { * @return Future object. * @throws IgniteCheckedException If failed. */ - private IgniteInternalFuture<?> broadcast(int idx, Runnable job, @Nullable IgnitePredicate<ClusterNode> p) + private ComputeTaskFuture<?> broadcast(int idx, Runnable job, @Nullable IgnitePredicate<ClusterNode> p) throws IgniteCheckedException { assert idx >= 0 && idx < NODES_CNT; assert job != null; @@ -237,10 +237,11 @@ public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { * @param jobs Runnable jobs. * @param p Optional node predicate. * @return Future object. - * @throws IgniteCheckedException If failed. */ - private IgniteInternalFuture<?> runAsync(int idx, Collection<TestRunnable> jobs, @Nullable IgnitePredicate<ClusterNode> p) - throws IgniteCheckedException { + private ComputeTaskFuture<?> runAsync(int idx, + Collection<TestRunnable> jobs, + @Nullable IgnitePredicate<ClusterNode> p) + { assert idx >= 0 && idx < NODES_CNT; assert !F.isEmpty(jobs); @@ -260,10 +261,10 @@ public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { * @param job Callable job. * @param p Optional node predicate. * @return Future object. - * @throws IgniteCheckedException If failed. */ - private IgniteInternalFuture<Integer> callAsync(int idx, Callable<Integer> job, @Nullable IgnitePredicate<ClusterNode> p) - throws IgniteCheckedException { + private ComputeTaskFuture<Integer> callAsync(int idx, + Callable<Integer> job, @Nullable + IgnitePredicate<ClusterNode> p) { assert idx >= 0 && idx < NODES_CNT; assert job != null; @@ -285,7 +286,7 @@ public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { * @return Future object. * @throws IgniteCheckedException If failed. */ - private IgniteInternalFuture<Collection<Integer>> broadcast(int idx, Callable<Integer> job, + private ComputeTaskFuture<Collection<Integer>> broadcast(int idx, Callable<Integer> job, @Nullable IgnitePredicate<ClusterNode> p) throws IgniteCheckedException { assert idx >= 0 && idx < NODES_CNT; assert job != null; @@ -308,7 +309,7 @@ public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { * @return Future object. * @throws IgniteCheckedException If failed. */ - private IgniteInternalFuture<Collection<Integer>> callAsync(int idx, Collection<TestCallable> jobs, + private ComputeTaskFuture<Collection<Integer>> callAsync(int idx, Collection<TestCallable> jobs, @Nullable IgnitePredicate<ClusterNode> p) throws IgniteCheckedException { assert idx >= 0 && idx < NODES_CNT; assert !F.isEmpty(jobs); @@ -342,7 +343,7 @@ public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { public void testRunAsyncSingle() throws Exception { Runnable job = new TestRunnable(); - IgniteInternalFuture<?> fut = broadcast(0, job, null); + ComputeTaskFuture<?> fut = broadcast(0, job, null); assert fut.get() == null; @@ -368,7 +369,7 @@ public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { public void testRunAsyncMultiple() throws Exception { Collection<TestRunnable> jobs = F.asList(new TestRunnable(), new TestRunnable()); - IgniteInternalFuture<?> fut = runAsync(0, jobs, null); + ComputeTaskFuture<?> fut = runAsync(0, jobs, null); assert fut.get() == null : "Execution result must be null."; @@ -382,7 +383,7 @@ public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { public void testCallAsyncSingle() throws Exception { Callable<Integer> job = new TestCallable(); - IgniteInternalFuture<Collection<Integer>> fut1 = broadcast(0, job, null); + ComputeTaskFuture<Collection<Integer>> fut1 = broadcast(0, job, null); assert fut1.get() != null; @@ -395,7 +396,7 @@ public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { assertEquals(1, execCntr.get()); - IgniteInternalFuture<Integer> fut2 = callAsync(0, job, null); + ComputeTaskFuture<Integer> fut2 = callAsync(0, job, null); assert fut2.get() == 1 : "Execution result must be equal to 1, actual: " + fut2.get(); @@ -412,14 +413,14 @@ public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { comp.withNoFailover().call(new TestCallableError()); - IgniteInternalFuture<Integer> fut = comp.future(); + ComputeTaskFuture<Integer> fut = comp.future(); try { fut.get(); assert false : "Exception should have been thrown."; } - catch (IgniteCheckedException e) { + catch (IgniteException e) { info("Caught expected exception: " + e); } } @@ -468,7 +469,7 @@ public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { public void testCallAsyncMultiple() throws Exception { Collection<TestCallable> jobs = F.asList(new TestCallable(), new TestCallable()); - IgniteInternalFuture<Collection<Integer>> fut = callAsync(0, jobs, null); + ComputeTaskFuture<Collection<Integer>> fut = callAsync(0, jobs, null); Collection<Integer> results = fut.get(); @@ -491,7 +492,7 @@ public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { comp.call(jobs, F.sumIntReducer()); - IgniteInternalFuture<Integer> fut = comp.future(); + ComputeTaskFuture<Integer> fut = comp.future(); // Sum of arithmetic progression. int exp = (1 + jobs.size()) * jobs.size() / 2;