http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java index 031641f..2f4d2f6 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java @@ -813,7 +813,7 @@ public class GridKernal extends ClusterGroupAdapter implements GridEx, IgniteMBe if (verCheckErr != null) U.error(log, verCheckErr.getMessage()); - else if (X.hasCause(e, InterruptedException.class, GridInterruptedException.class)) + else if (X.hasCause(e, InterruptedException.class, InternalInterruptedException.class)) U.warn(log, "Grid startup routine has been interrupted (will rollback)."); else U.error(log, "Got exception while starting (will rollback startup routine).", e); @@ -2490,7 +2490,7 @@ public class GridKernal extends ClusterGroupAdapter implements GridEx, IgniteMBe try { compute().undeployTask(taskName); } - catch (IgniteCheckedException e) { + catch (IgniteException e) { throw U.jmException(e); } } @@ -2501,7 +2501,7 @@ public class GridKernal extends ClusterGroupAdapter implements GridEx, IgniteMBe try { return compute().<String, String>execute(taskName, arg); } - catch (IgniteCheckedException e) { + catch (IgniteException e) { throw U.jmException(e); } } @@ -2526,7 +2526,7 @@ public class GridKernal extends ClusterGroupAdapter implements GridEx, IgniteMBe * @see #isSmtpEnabled() * @see org.apache.ignite.configuration.IgniteConfiguration#getAdminEmails() */ - @Override public IgniteFuture<Boolean> sendAdminEmailAsync(String subj, String body, boolean html) { + @Override public InternalFuture<Boolean> sendAdminEmailAsync(String subj, String body, boolean html) { A.notNull(subj, "subj"); A.notNull(body, "body"); @@ -2618,7 +2618,7 @@ public class GridKernal extends ClusterGroupAdapter implements GridEx, IgniteMBe * @throws IgniteCheckedException In case of error. * @see {@link org.apache.ignite.IgniteCluster#startNodes(java.io.File, boolean, int, int)}. */ - IgniteFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(File file, boolean restart, int timeout, int maxConn) throws IgniteCheckedException { + InternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(File file, boolean restart, int timeout, int maxConn) throws IgniteCheckedException { A.notNull(file, "file"); A.ensure(file.exists(), "file doesn't exist."); A.ensure(file.isFile(), "file is a directory."); @@ -2660,7 +2660,7 @@ public class GridKernal extends ClusterGroupAdapter implements GridEx, IgniteMBe * @throws IgniteCheckedException In case of error. * @see {@link org.apache.ignite.IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int)}. */ - IgniteFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync( + InternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync( Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts, boolean restart, int timeout, int maxConn) throws IgniteCheckedException { A.notNull(hosts, "hosts"); @@ -2801,15 +2801,15 @@ public class GridKernal extends ClusterGroupAdapter implements GridEx, IgniteMBe if (call == null) return false; - IgniteFuture<GridTuple3<String, Boolean, String>> fut = ctx.closure().callLocalSafe(call, true); + InternalFuture<GridTuple3<String, Boolean, String>> fut = ctx.closure().callLocalSafe(call, true); comp.add(fut); if (cnt.decrementAndGet() == 0) comp.markInitialized(); - fut.listenAsync(new CI1<IgniteFuture<GridTuple3<String, Boolean, String>>>() { - @Override public void apply(IgniteFuture<GridTuple3<String, Boolean, String>> f) { + fut.listenAsync(new CI1<InternalFuture<GridTuple3<String, Boolean, String>>>() { + @Override public void apply(InternalFuture<GridTuple3<String, Boolean, String>> f) { runNextNodeCallable(queue, comp, cnt); } });
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/GridTaskFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridTaskFutureImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridTaskFutureImpl.java index 77cb10e..e2509f5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridTaskFutureImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridTaskFutureImpl.java @@ -23,7 +23,7 @@ import java.util.*; * This class provide implementation for task future. * @param <R> Type of the task result returning from {@link org.apache.ignite.compute.ComputeTask#reduce(List)} method. */ -public class GridTaskFutureImpl<R> extends GridFutureAdapter<R> implements ComputeTaskFuture<R> { +public class GridTaskFutureImpl<R> extends GridFutureAdapter<R> implements InternalComputeTaskFuture<R> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/GridTaskSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridTaskSessionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridTaskSessionImpl.java index 1466b76..58b0c3a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridTaskSessionImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridTaskSessionImpl.java @@ -820,7 +820,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> mapFuture() { + @Override public InternalFuture<?> mapFuture() { return mapFut; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteComputeImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteComputeImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteComputeImpl.java index 7205e76..fc391b6 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteComputeImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteComputeImpl.java @@ -61,8 +61,9 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { this.ctx = ctx; this.prj = prj; this.subjId = subjId; - - asyncSup = new IgniteAsyncSupportAdapter(async); + if (true) { + asyncSup = new IgniteAsyncSupportAdapter(async); + } } /** {@inheritDoc} */ @@ -89,14 +90,14 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public void affinityRun(@Nullable String cacheName, Object affKey, Runnable job) throws IgniteCheckedException { + @Override public void affinityRun(@Nullable String cacheName, Object affKey, Runnable job) throws IgniteException { A.notNull(affKey, "affKey"); A.notNull(job, "job"); guard(); try { - asyncSup.saveOrGet(ctx.closure().affinityRun(cacheName, affKey, job, prj.nodes())); + asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.closure().affinityRun(cacheName, affKey, job, prj.nodes()))); } finally { unguard(); @@ -105,14 +106,14 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { /** {@inheritDoc} */ @Override public <R> R affinityCall(@Nullable String cacheName, Object affKey, Callable<R> job) - throws IgniteCheckedException { + throws IgniteException { A.notNull(affKey, "affKey"); A.notNull(job, "job"); guard(); try { - return asyncSup.saveOrGet(ctx.closure().affinityCall(cacheName, affKey, job, prj.nodes())); + return asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.closure().affinityCall(cacheName, affKey, job, prj.nodes()))); } finally { unguard(); @@ -120,7 +121,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public <T, R> R execute(String taskName, @Nullable T arg) throws IgniteCheckedException { + @Override public <T, R> R execute(String taskName, @Nullable T arg) throws IgniteException { A.notNull(taskName, "taskName"); guard(); @@ -129,7 +130,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); - return (R)asyncSup.saveOrGet(ctx.task().execute(taskName, arg)); + return (R)asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.task().execute(taskName, arg))); } finally { unguard(); @@ -138,7 +139,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { /** {@inheritDoc} */ @Override public <T, R> R execute(Class<? extends ComputeTask<T, R>> taskCls, - @Nullable T arg) throws IgniteCheckedException { + @Nullable T arg) throws IgniteException { A.notNull(taskCls, "taskCls"); guard(); @@ -147,7 +148,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); - return asyncSup.saveOrGet(ctx.task().execute(taskCls, arg)); + return asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.task().execute(taskCls, arg))); } finally { unguard(); @@ -155,7 +156,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public <T, R> R execute(ComputeTask<T, R> task, @Nullable T arg) throws IgniteCheckedException { + @Override public <T, R> R execute(ComputeTask<T, R> task, @Nullable T arg) throws IgniteException { A.notNull(task, "task"); guard(); @@ -164,7 +165,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); - return asyncSup.saveOrGet(ctx.task().execute(task, arg)); + return asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.task().execute(task, arg))); } finally { unguard(); @@ -172,13 +173,13 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public void broadcast(Runnable job) throws IgniteCheckedException { + @Override public void broadcast(Runnable job) throws IgniteException { A.notNull(job, "job"); guard(); try { - asyncSup.saveOrGet(ctx.closure().runAsync(BROADCAST, job, prj.nodes())); + asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.closure().runAsync(BROADCAST, job, prj.nodes()))); } finally { unguard(); @@ -186,13 +187,13 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public <R> Collection<R> broadcast(Callable<R> job) throws IgniteCheckedException { + @Override public <R> Collection<R> broadcast(Callable<R> job) throws IgniteException { A.notNull(job, "job"); guard(); try { - return asyncSup.saveOrGet(ctx.closure().callAsync(BROADCAST, Arrays.asList(job), prj.nodes())); + return asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.closure().callAsync(BROADCAST, Arrays.asList(job), prj.nodes()))); } finally { unguard(); @@ -200,13 +201,13 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public <R, T> Collection<R> broadcast(IgniteClosure<T, R> job, @Nullable T arg) throws IgniteCheckedException { + @Override public <R, T> Collection<R> broadcast(IgniteClosure<T, R> job, @Nullable T arg) throws IgniteException { A.notNull(job, "job"); guard(); try { - return asyncSup.saveOrGet(ctx.closure().broadcast(job, arg, prj.nodes())); + return asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.closure().broadcast(job, arg, prj.nodes()))); } finally { unguard(); @@ -214,13 +215,13 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public void run(Runnable job) throws IgniteCheckedException { + @Override public void run(Runnable job) throws IgniteException { A.notNull(job, "job"); guard(); try { - asyncSup.saveOrGet(ctx.closure().runAsync(BALANCE, job, prj.nodes())); + asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.closure().runAsync(BALANCE, job, prj.nodes()))); } finally { unguard(); @@ -228,13 +229,13 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public void run(Collection<? extends Runnable> jobs) throws IgniteCheckedException { + @Override public void run(Collection<? extends Runnable> jobs) throws IgniteException { A.notEmpty(jobs, "jobs"); guard(); try { - asyncSup.saveOrGet(ctx.closure().runAsync(BALANCE, jobs, prj.nodes())); + asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.closure().runAsync(BALANCE, jobs, prj.nodes()))); } finally { unguard(); @@ -242,13 +243,13 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public <R, T> R apply(IgniteClosure<T, R> job, @Nullable T arg) throws IgniteCheckedException { + @Override public <R, T> R apply(IgniteClosure<T, R> job, @Nullable T arg) throws IgniteException { A.notNull(job, "job"); guard(); try { - return asyncSup.saveOrGet(ctx.closure().callAsync(job, arg, prj.nodes())); + return asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.closure().callAsync(job, arg, prj.nodes()))); } finally { unguard(); @@ -256,13 +257,13 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public <R> R call(Callable<R> job) throws IgniteCheckedException { + @Override public <R> R call(Callable<R> job) throws IgniteException { A.notNull(job, "job"); guard(); try { - return asyncSup.saveOrGet(ctx.closure().callAsync(BALANCE, job, prj.nodes())); + return asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.closure().callAsync(BALANCE, job, prj.nodes()))); } finally { unguard(); @@ -270,13 +271,13 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public <R> Collection<R> call(Collection<? extends Callable<R>> jobs) throws IgniteCheckedException { + @Override public <R> Collection<R> call(Collection<? extends Callable<R>> jobs) throws IgniteException { A.notEmpty(jobs, "jobs"); guard(); try { - return asyncSup.saveOrGet(ctx.closure().callAsync(BALANCE, jobs, prj.nodes())); + return asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.closure().callAsync(BALANCE, jobs, prj.nodes()))); } finally { unguard(); @@ -285,14 +286,14 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { /** {@inheritDoc} */ @Override public <T, R> Collection<R> apply(final IgniteClosure<T, R> job, - @Nullable Collection<? extends T> args) throws IgniteCheckedException { + @Nullable Collection<? extends T> args) throws IgniteException { A.notNull(job, "job"); A.notNull(args, "args"); guard(); try { - return asyncSup.saveOrGet(ctx.closure().callAsync(job, args, prj.nodes())); + return asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.closure().callAsync(job, args, prj.nodes()))); } finally { unguard(); @@ -301,14 +302,14 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { /** {@inheritDoc} */ @Override public <R1, R2> R2 call(Collection<? extends Callable<R1>> jobs, IgniteReducer<R1, R2> rdc) - throws IgniteCheckedException { + throws IgniteException { A.notEmpty(jobs, "jobs"); A.notNull(rdc, "rdc"); guard(); try { - return asyncSup.saveOrGet(ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes())); + return asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes()))); } finally { unguard(); @@ -317,7 +318,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { /** {@inheritDoc} */ @Override public <R1, R2, T> R2 apply(IgniteClosure<T, R1> job, Collection<? extends T> args, - IgniteReducer<R1, R2> rdc) throws IgniteCheckedException { + IgniteReducer<R1, R2> rdc) throws IgniteException { A.notNull(job, "job"); A.notNull(rdc, "rdc"); A.notNull(args, "args"); @@ -325,7 +326,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - return asyncSup.saveOrGet(ctx.closure().callAsync(job, args, rdc, prj.nodes())); + return asyncSup.saveOrGet(new IgniteFutureAdapter<>(ctx.closure().callAsync(job, args, rdc, prj.nodes()))); } finally { unguard(); @@ -333,7 +334,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public <R> Map<IgniteUuid, ComputeTaskFuture<R>> activeTaskFutures() { + @Override public <R> Map<IgniteUuid, InternalComputeTaskFuture<R>> activeTaskFutures() { guard(); try { @@ -391,7 +392,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public void localDeployTask(Class<? extends ComputeTask> taskCls, ClassLoader clsLdr) throws IgniteCheckedException { + @Override public void localDeployTask(Class<? extends ComputeTask> taskCls, ClassLoader clsLdr) throws IgniteException { A.notNull(taskCls, "taskCls", clsLdr, "clsLdr"); guard(); @@ -402,6 +403,9 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { if (dep == null) throw new GridDeploymentException("Failed to deploy task (was task (re|un)deployed?): " + taskCls); } + catch (IgniteCheckedException e) { + throw U.wrap(e); + } finally { unguard(); } @@ -420,7 +424,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { } /** {@inheritDoc} */ - @Override public void undeployTask(String taskName) throws IgniteCheckedException { + @Override public void undeployTask(String taskName) throws IgniteException { A.notNull(taskName, "taskName"); guard(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteFutureAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteFutureAdapter.java new file mode 100644 index 0000000..ddb2c88 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteFutureAdapter.java @@ -0,0 +1,175 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * + */ +public class IgniteFutureAdapter<R> implements IgniteFuture<R> { + /** */ + protected final InternalFuture<R> delegate; + + /** */ + private volatile Map<IgniteInClosure<? super IgniteFuture<R>>, InClosureDelegate> listenersMap; + + /** + * @param delegate Delegate. + */ + public IgniteFutureAdapter(InternalFuture<R> delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public R get() throws IgniteException { + try { + return delegate.get(); + } + catch (IgniteCheckedException e) { + throw U.wrap(e); + } + } + + /** {@inheritDoc} */ + @Override public R get(long timeout) throws IgniteException { + try { + return delegate.get(timeout); + } + catch (IgniteCheckedException e) { + throw U.wrap(e); + } + } + + /** {@inheritDoc} */ + @Override public R get(long timeout, TimeUnit unit) throws IgniteException { + try { + return delegate.get(timeout, unit); + } + catch (IgniteCheckedException e) { + throw U.wrap(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean cancel() throws IgniteException { + try { + return delegate.cancel(); + } + catch (IgniteCheckedException e) { + throw U.wrap(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isDone() { + return delegate.isDone(); + } + + /** {@inheritDoc} */ + @Override public boolean isCancelled() { + return delegate.isCancelled(); + } + + /** {@inheritDoc} */ + @Override public long startTime() { + return delegate.startTime(); + } + + /** {@inheritDoc} */ + @Override public long duration() { + return delegate.duration(); + } + + /** {@inheritDoc} */ + @Override public void syncNotify(boolean syncNotify) { + delegate.syncNotify(syncNotify); + } + + /** {@inheritDoc} */ + @Override public boolean syncNotify() { + return delegate.syncNotify(); + } + + /** {@inheritDoc} */ + @Override public void concurrentNotify(boolean concurNotify) { + delegate.concurrentNotify(concurNotify); + } + + /** {@inheritDoc} */ + @Override public boolean concurrentNotify() { + return delegate.concurrentNotify(); + } + + /** {@inheritDoc} */ + @Override public void listenAsync(@Nullable final IgniteInClosure<? super IgniteFuture<R>> lsnr) { + Map<IgniteInClosure<? super IgniteFuture<R>>, InClosureDelegate> map = listenersMap; + + if (map == null) { + synchronized (this) { + map = Collections.synchronizedMap(new HashMap<IgniteInClosure<? super IgniteFuture<R>>, InClosureDelegate>()); + + listenersMap = map; + } + } + + InClosureDelegate closure = new InClosureDelegate((IgniteInClosure<IgniteFuture<R>>)lsnr); + + map.put(lsnr, closure); + + delegate.listenAsync(closure); + } + + /** {@inheritDoc} */ + @Override public void stopListenAsync(@Nullable IgniteInClosure<? super IgniteFuture<R>>... lsnr) { + Map<IgniteInClosure<? super IgniteFuture<R>>, InClosureDelegate> map = listenersMap; + + if (map == null) + return; + + InClosureDelegate inClosureDelegate = map.remove(lsnr); + + if (inClosureDelegate != null) + delegate.stopListenAsync(inClosureDelegate); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<T> chain(final IgniteClosure<? super IgniteFuture<R>, T> doneCb) { + return new IgniteFutureAdapter<>(delegate.chain(new IgniteClosure<InternalFuture<R>, T>() { + @Override public T apply(InternalFuture<R> future) { + return doneCb.apply(IgniteFutureAdapter.this); + } + })); + } + + /** + * + */ + private class InClosureDelegate implements IgniteInClosure<InternalFuture<R>> { + /** */ + private final IgniteInClosure<IgniteFuture<R>> userClosure; + + public InClosureDelegate(IgniteInClosure<IgniteFuture<R>> userClosure) { + this.userClosure = userClosure; + } + + /** {@inheritDoc} */ + @Override public void apply(InternalFuture<R> f) { + if (f == delegate) + userClosure.apply(IgniteFutureAdapter.this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteSchedulerImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteSchedulerImpl.java index e713c21..8dbe31b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteSchedulerImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteSchedulerImpl.java @@ -42,7 +42,7 @@ public class IgniteSchedulerImpl implements IgniteScheduler, Externalizable { } /** {@inheritDoc} */ - @Override public IgniteFuture<?> runLocal(Runnable r) { + @Override public InternalFuture<?> runLocal(Runnable r) { A.notNull(r, "r"); guard(); @@ -56,7 +56,7 @@ public class IgniteSchedulerImpl implements IgniteScheduler, Externalizable { } /** {@inheritDoc} */ - @Override public <R> IgniteFuture<R> callLocal(Callable<R> c) { + @Override public <R> InternalFuture<R> callLocal(Callable<R> c) { A.notNull(c, "c"); guard(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/KernalFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/KernalFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/KernalFuture.java new file mode 100644 index 0000000..b4f8497 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/KernalFuture.java @@ -0,0 +1,16 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal; + +/** + * + */ +public class KernalFuture<R> { +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/executor/GridExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/executor/GridExecutorService.java b/modules/core/src/main/java/org/gridgain/grid/kernal/executor/GridExecutorService.java index 8be6cc1..823904c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/executor/GridExecutorService.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/executor/GridExecutorService.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.executor; import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.lang.*; @@ -81,7 +80,7 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe private boolean isBeingShutdown; /** List of executing or scheduled for execution tasks. */ - private List<IgniteFuture<?>> futs = new ArrayList<>(); + private List<InternalFuture<?>> futs = new ArrayList<>(); /** Rejected or completed tasks listener. */ private TaskTerminateListener lsnr = new TaskTerminateListener<>(); @@ -145,7 +144,7 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe /** {@inheritDoc} */ @Override public List<Runnable> shutdownNow() { - List<IgniteFuture<?>> cpFuts; + List<InternalFuture<?>> cpFuts; // Cancel all tasks. synchronized (mux) { @@ -154,7 +153,7 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe isBeingShutdown = true; } - for (IgniteFuture<?> task : cpFuts) { + for (InternalFuture<?> task : cpFuts) { try { task.cancel(); } @@ -192,17 +191,17 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe if (end < 0) end = Long.MAX_VALUE; - List<IgniteFuture<?>> locTasks; + List<InternalFuture<?>> locTasks; // Cancel all tasks. synchronized (mux) { locTasks = new ArrayList<>(futs); } - Iterator<IgniteFuture<?>> iter = locTasks.iterator(); + Iterator<InternalFuture<?>> iter = locTasks.iterator(); while (iter.hasNext() && now < end) { - IgniteFuture<?> fut = iter.next(); + InternalFuture<?> fut = iter.next(); try { fut.get(end - now); @@ -255,8 +254,8 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe try { comp.run(task); - IgniteFuture<T> fut = comp.future().chain(new CX1<IgniteFuture<?>, T>() { - @Override public T applyx(IgniteFuture<?> fut) throws IgniteCheckedException { + InternalFuture<T> fut = comp.future().chain(new CX1<InternalFuture<?>, T>() { + @Override public T applyx(InternalFuture<?> fut) throws IgniteCheckedException { fut.get(); return res; @@ -339,14 +338,14 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe checkShutdown(); - Collection<IgniteFuture<T>> taskFuts = new ArrayList<>(); + Collection<InternalFuture<T>> taskFuts = new ArrayList<>(); assert comp.isAsync(); for (Callable<T> task : tasks) { // Execute task without predefined timeout. // GridFuture.cancel() will be called if timeout elapsed. - IgniteFuture<T> fut; + InternalFuture<T> fut; try { comp.call(task); @@ -365,7 +364,7 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe boolean isInterrupted = false; - for (IgniteFuture<T> fut : taskFuts) { + for (InternalFuture<T> fut : taskFuts) { if (!isInterrupted && now < end) { try { fut.get(end - now); @@ -397,7 +396,7 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe List<Future<T>> futs = new ArrayList<>(taskFuts.size()); // Convert futures. - for (IgniteFuture<T> fut : taskFuts) { + for (InternalFuture<T> fut : taskFuts) { // Per executor service contract any task that was not completed // should be cancelled upon return. if (!fut.isDone()) @@ -414,7 +413,7 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe * * @param fut Future to cancel. */ - private void cancelFuture(IgniteFuture<?> fut) { + private void cancelFuture(InternalFuture<?> fut) { try { fut.cancel(); } @@ -479,13 +478,13 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe checkShutdown(); - Collection<IgniteFuture<T>> taskFuts = new ArrayList<>(); + Collection<InternalFuture<T>> taskFuts = new ArrayList<>(); assert comp.isAsync(); for (Callable<T> cmd : tasks) { // Execute task with predefined timeout. - IgniteFuture<T> fut; + InternalFuture<T> fut; try { @@ -508,7 +507,7 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe int errCnt = 0; - for (IgniteFuture<T> fut : taskFuts) { + for (InternalFuture<T> fut : taskFuts) { now = U.currentTimeMillis(); boolean cancel = false; @@ -522,7 +521,7 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe // Cancel next tasks (avoid current task cancellation below in loop). continue; } - catch (IgniteFutureTimeoutException ignored) { + catch (InternalFutureTimeoutException ignored) { if (log.isDebugEnabled()) log.debug("Timeout occurred during getting task result: " + fut); @@ -595,7 +594,7 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe * @return Future for command. */ @SuppressWarnings("unchecked") - private <T> Future<T> addFuture(IgniteFuture<T> fut) { + private <T> Future<T> addFuture(InternalFuture<T> fut) { synchronized (mux) { if (!fut.isDone()) { fut.listenAsync(lsnr); @@ -610,12 +609,12 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe /** * Listener to track tasks. */ - private class TaskTerminateListener<T> implements IgniteInClosure<IgniteFuture<T>> { + private class TaskTerminateListener<T> implements IgniteInClosure<InternalFuture<T>> { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void apply(IgniteFuture<T> taskFut) { + @Override public void apply(InternalFuture<T> taskFut) { synchronized (mux) { futs.remove(taskFut); } @@ -623,20 +622,20 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe } /** - * Wrapper for {@link org.apache.ignite.lang.IgniteFuture}. + * Wrapper for {@link InternalFuture}. * Used for compatibility {@link Future} interface. * @param <T> The result type of the {@link Future} argument. */ private class TaskFutureWrapper<T> implements Future<T> { /** */ - private final IgniteFuture<T> fut; + private final InternalFuture<T> fut; /** * Creates wrapper. * * @param fut Grid future. */ - TaskFutureWrapper(IgniteFuture<T> fut) { + TaskFutureWrapper(InternalFuture<T> fut) { assert fut != null; this.fut = fut; @@ -702,7 +701,7 @@ public class GridExecutorService extends GridMetadataAwareAdapter implements Exe return res; } - catch (IgniteFutureTimeoutException e) { + catch (InternalFutureTimeoutException e) { TimeoutException e2 = new TimeoutException(); e2.initCause(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/ggfs/common/GridGgfsLogger.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/ggfs/common/GridGgfsLogger.java b/modules/core/src/main/java/org/gridgain/grid/kernal/ggfs/common/GridGgfsLogger.java index b9fcf3a..52b45c8 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/ggfs/common/GridGgfsLogger.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/ggfs/common/GridGgfsLogger.java @@ -436,7 +436,7 @@ public final class GridGgfsLogger { try { U.join(flushWorker); } - catch (GridInterruptedException ignore) { + catch (InternalInterruptedException ignore) { // No-op. } @@ -686,7 +686,7 @@ public final class GridGgfsLogger { try { U.await(flushCond, 1000L, TimeUnit.MILLISECONDS); } - catch (GridInterruptedException ignore) { + catch (InternalInterruptedException ignore) { t.interrupt(); break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/discovery/GridDiscoveryManager.java index 54994d1..0c20c44 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/discovery/GridDiscoveryManager.java @@ -941,7 +941,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param awaitVer Topology version to await. * @return Future. */ - public IgniteFuture<Long> topologyFuture(final long awaitVer) { + public InternalFuture<Long> topologyFuture(final long awaitVer) { long topVer = topologyVersion(); if (topVer >= awaitVer) @@ -1585,7 +1585,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** {@inheritDoc} */ - @Override protected void body() throws GridInterruptedException { + @Override protected void body() throws InternalInterruptedException { while (!isCancelled()) { U.sleep(METRICS_UPDATE_FREQ); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/eventstorage/GridEventStorageManager.java index 87da0a9..b0f6cc7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/eventstorage/GridEventStorageManager.java @@ -654,7 +654,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> * @param types Event types to wait for. * @return Event future. */ - public <T extends IgniteEvent> IgniteFuture<T> waitForEvent(@Nullable final IgnitePredicate<T> p, + public <T extends IgniteEvent> InternalFuture<T> waitForEvent(@Nullable final IgnitePredicate<T> p, @Nullable int... types) { final GridFutureAdapter<T> fut = new GridFutureAdapter<>(ctx); @@ -753,7 +753,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> * @param timeout Maximum time to wait for result, if {@code 0}, then wait until result is received. * @return Collection of events. */ - public <T extends IgniteEvent> IgniteFuture<List<T>> remoteEventsAsync(final IgnitePredicate<T> p, + public <T extends IgniteEvent> InternalFuture<List<T>> remoteEventsAsync(final IgnitePredicate<T> p, final Collection<? extends ClusterNode> nodes, final long timeout) { assert p != null; assert nodes != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java index 926f717..4c1934e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java @@ -14,7 +14,6 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.lang.*; import org.apache.ignite.portables.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -205,7 +204,7 @@ public class GridAffinityAssignmentCache { * @param topVer Topology version to await for. * @return Future that will be completed after affinity for topology version {@code topVer} is calculated. */ - public IgniteFuture<Long> readyFuture(long topVer) { + public InternalFuture<Long> readyFuture(long topVer) { GridAffinityAssignment aff = head.get(); if (aff.topologyVersion() >= topVer) { @@ -339,7 +338,7 @@ public class GridAffinityAssignmentCache { log.debug("Will wait for topology version [locNodeId=" + ctx.localNodeId() + ", topVer=" + topVer + ']'); - IgniteFuture<Long> fut = readyFuture(topVer); + InternalFuture<Long> fut = readyFuture(topVer); if (fut != null) fut.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90948e67/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java index 74a1a49..d5a3bc7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java @@ -53,7 +53,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { private static final String NULL_NAME = U.id8(UUID.randomUUID()); /** Affinity map. */ - private final ConcurrentMap<AffinityAssignmentKey, IgniteFuture<AffinityInfo>> affMap = new ConcurrentHashMap8<>(); + private final ConcurrentMap<AffinityAssignmentKey, InternalFuture<AffinityInfo>> affMap = new ConcurrentHashMap8<>(); /** Listener. */ private final GridLocalEventListener lsnr = new GridLocalEventListener() { @@ -247,7 +247,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { private AffinityInfo affinityCache(@Nullable final String cacheName, long topVer) throws IgniteCheckedException { AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer); - IgniteFuture<AffinityInfo> fut = affMap.get(key); + InternalFuture<AffinityInfo> fut = affMap.get(key); if (fut != null) return fut.get(); @@ -264,7 +264,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { new GridAffinityAssignment(topVer, cctx.affinity().assignments(topVer)), cctx.portableEnabled()); - IgniteFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(ctx, info)); + InternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(ctx, info)); if (old != null) info = old.get(); @@ -285,7 +285,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { GridFutureAdapter<AffinityInfo> fut0 = new GridFutureAdapter<>(); - IgniteFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0); + InternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0); if (old != null) return old.get();