# ignite-26
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cf72dda0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cf72dda0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cf72dda0 Branch: refs/heads/ignite-106 Commit: cf72dda0913a86f0cf4c0a7eb7d477572e4371fa Parents: 0693fa9 Author: sboikov <sboi...@gridgain.com> Authored: Mon Feb 2 17:01:38 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Feb 2 17:01:38 2015 +0300 ---------------------------------------------------------------------- .../ignite/compute/ComputeTaskSession.java | 2 +- .../internal/ComputeTaskInternalFuture.java | 139 +++++++++++++ .../ignite/internal/GridJobSessionImpl.java | 4 +- .../ignite/internal/GridTaskSessionImpl.java | 10 +- .../closure/GridClosureProcessor.java | 104 +++++----- .../util/future/IgniteFinishedFutureImpl.java | 7 + .../org/apache/ignite/GridTestTaskSession.java | 2 +- .../internal/GridSpiExceptionSelfTest.java | 8 +- .../IgniteComputeEmptyClusterGroupTest.java | 198 +++++++++++++++++++ .../collision/GridTestCollisionTaskSession.java | 2 +- .../testsuites/IgniteComputeGridTestSuite.java | 1 + 11 files changed, 416 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java index 9dc83fc..3e5f805 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java @@ -440,5 +440,5 @@ public interface ComputeTaskSession { * * @return Future that will be completed when task "<tt>map</tt>" step has completed. */ - public IgniteInternalFuture<?> mapFuture(); + public IgniteFuture<?> mapFuture(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java index a6a6004..a5a4574 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java @@ -21,9 +21,11 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; import java.io.*; import java.util.*; @@ -70,6 +72,143 @@ public class ComputeTaskInternalFuture<R> extends GridFutureAdapter<R> { } /** + * @param ctx Context. + * @param taskCls Task class. + * @param e Error. + * @return Finished task future. + */ + public static <R> ComputeTaskInternalFuture<R> finishedFuture(final GridKernalContext ctx, + final Class<?> taskCls, + IgniteCheckedException e) { + assert ctx != null; + assert taskCls != null; + assert e != null; + + final long time = U.currentTimeMillis(); + + final IgniteUuid id = IgniteUuid.fromUuid(ctx.localNodeId()); + + ComputeTaskSession ses = new ComputeTaskSession() { + @Override public String getTaskName() { + return taskCls.getName(); + } + + @Override public UUID getTaskNodeId() { + return ctx.localNodeId(); + } + + @Override public long getStartTime() { + return time; + } + + @Override public long getEndTime() { + return time; + } + + @Override public IgniteUuid getId() { + return id; + } + + @Override public ClassLoader getClassLoader() { + return null; + } + + @Override public Collection<ComputeJobSibling> getJobSiblings() throws IgniteException { + return Collections.emptyList(); + } + + @Override public Collection<ComputeJobSibling> refreshJobSiblings() throws IgniteException { + return Collections.emptyList(); + } + + @Nullable @Override public ComputeJobSibling getJobSibling(IgniteUuid jobId) throws IgniteException { + return null; + } + + @Override public void setAttribute(Object key, @Nullable Object val) throws IgniteException { + } + + @Nullable @Override public <K, V> V getAttribute(K key) { + return null; + } + + @Override public void setAttributes(Map<?, ?> attrs) throws IgniteException { + // No-op. + } + + @Override public Map<?, ?> getAttributes() { + return Collections.emptyMap(); + } + + @Override public void addAttributeListener(ComputeTaskSessionAttributeListener lsnr, boolean rewind) { + // No-op. + } + + @Override public boolean removeAttributeListener(ComputeTaskSessionAttributeListener lsnr) { + return false; + } + + @Override public <K, V> V waitForAttribute(K key, long timeout) throws InterruptedException { + throw new InterruptedException("Session was closed."); + } + + @Override public <K, V> boolean waitForAttribute(K key, @Nullable V val, long timeout) throws InterruptedException { + throw new InterruptedException("Session was closed."); + } + + @Override public Map<?, ?> waitForAttributes(Collection<?> keys, long timeout) throws InterruptedException { + throw new InterruptedException("Session was closed."); + } + + @Override public boolean waitForAttributes(Map<?, ?> attrs, long timeout) throws InterruptedException { + throw new InterruptedException("Session was closed."); + } + + @Override public void saveCheckpoint(String key, Object state) { + throw new IgniteException("Session was closed."); + } + + @Override public void saveCheckpoint(String key, + Object state, + ComputeTaskSessionScope scope, + long timeout) + { + throw new IgniteException("Session was closed."); + } + + @Override public void saveCheckpoint(String key, + Object state, + ComputeTaskSessionScope scope, + long timeout, + boolean overwrite) { + throw new IgniteException("Session was closed."); + } + + @Nullable @Override public <T> T loadCheckpoint(String key) throws IgniteException { + throw new IgniteException("Session was closed."); + } + + @Override public boolean removeCheckpoint(String key) throws IgniteException { + throw new IgniteException("Session was closed."); + } + + @Override public Collection<UUID> getTopology() { + return Collections.emptyList(); + } + + @Override public IgniteFuture<?> mapFuture() { + return new IgniteFinishedFutureImpl<Object>(ctx); + } + }; + + ComputeTaskInternalFuture<R> fut = new ComputeTaskInternalFuture<>(ses, ctx); + + fut.onDone(e); + + return fut; + } + + /** * @return Future returned by public API. */ public ComputeTaskFuture<R> publicFuture() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java index ef90408..5b26961 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java @@ -291,8 +291,8 @@ public class GridJobSessionImpl implements GridTaskSessionInternal { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> mapFuture() { - return new GridFinishedFuture<>(ctx); + @Override public IgniteFuture<?> mapFuture() { + return new IgniteFinishedFutureImpl<>(ctx, null); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java index 10283f8..be9ade4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java @@ -97,7 +97,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { private final UUID subjId; /** */ - private final GridFutureAdapter mapFut; + private final IgniteFutureImpl mapFut; /** * @param taskNodeId Task node ID. @@ -156,7 +156,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { this.fullSup = fullSup; this.subjId = subjId; - mapFut = new GridFutureAdapter(ctx); + mapFut = new IgniteFutureImpl(new GridFutureAdapter(ctx)); } /** {@inheritDoc} */ @@ -832,18 +832,18 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { * Task map callback. */ public void onMapped() { - mapFut.onDone(); + ((GridFutureAdapter)mapFut.internalFuture()).onDone(); } /** * Finish task callback. */ public void onDone() { - mapFut.onDone(); + ((GridFutureAdapter)mapFut.internalFuture()).onDone(); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> mapFuture() { + @Override public IgniteFuture<?> mapFuture() { return mapFut; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index f521161..19d15b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -124,7 +124,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Task execution future. */ - public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs, + public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs, @Nullable Collection<ClusterNode> nodes) { return runAsync(mode, jobs, nodes, false); } @@ -136,18 +136,19 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param sys If {@code true}, then system pool will be used. * @return Task execution future. */ - public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs, - @Nullable Collection<ClusterNode> nodes, boolean sys) { + public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, + Collection<? extends Runnable> jobs, + @Nullable Collection<ClusterNode> nodes, + boolean sys) + { assert mode != null; + assert !F.isEmpty(jobs) : jobs; enterBusy(); try { - if (F.isEmpty(jobs)) - return new GridFinishedFuture(ctx); - if (F.isEmpty(nodes)) - return new GridFinishedFuture(ctx, U.emptyTopologyException()); + return ComputeTaskInternalFuture.finishedFuture(ctx, T1.class, U.emptyTopologyException()); ctx.task().setThreadContext(TC_SUBGRID, nodes); @@ -164,7 +165,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Task execution future. */ - public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable job, + public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, Runnable job, @Nullable Collection<ClusterNode> nodes) { return runAsync(mode, job, nodes, false); } @@ -176,18 +177,19 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param sys If {@code true}, then system pool will be used. * @return Task execution future. */ - public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable job, - @Nullable Collection<ClusterNode> nodes, boolean sys) { + public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, + Runnable job, + @Nullable Collection<ClusterNode> nodes, + boolean sys) + { assert mode != null; + assert job != null; enterBusy(); try { - if (job == null) - return new GridFinishedFuture(ctx); - if (F.isEmpty(nodes)) - return new GridFinishedFuture(ctx, U.emptyTopologyException()); + return ComputeTaskInternalFuture.finishedFuture(ctx, T2.class, U.emptyTopologyException()); ctx.task().setThreadContext(TC_SUBGRID, nodes); @@ -314,19 +316,20 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param <R2> Type. * @return Reduced result. */ - public <R1, R2> IgniteInternalFuture<R2> forkjoinAsync(GridClosureCallMode mode, - @Nullable Collection<? extends Callable<R1>> jobs, - @Nullable IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) { + public <R1, R2> ComputeTaskInternalFuture<R2> forkjoinAsync(GridClosureCallMode mode, + Collection<? extends Callable<R1>> jobs, + IgniteReducer<R1, R2> rdc, + @Nullable Collection<ClusterNode> nodes) + { assert mode != null; + assert rdc != null; + assert !F.isEmpty(jobs); enterBusy(); try { - if (F.isEmpty(jobs) || rdc == null) - return new GridFinishedFuture<>(ctx); - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + return ComputeTaskInternalFuture.finishedFuture(ctx, T3.class, U.emptyTopologyException()); ctx.task().setThreadContext(TC_SUBGRID, nodes); @@ -344,7 +347,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param <R> Type. * @return Grid future for collection of closure results. */ - public <R> IgniteInternalFuture<Collection<R>> callAsync( + public <R> ComputeTaskInternalFuture<Collection<R>> callAsync( GridClosureCallMode mode, @Nullable Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode> nodes) { @@ -359,19 +362,19 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param <R> Type. * @return Grid future for collection of closure results. */ - public <R> IgniteInternalFuture<Collection<R>> callAsync(GridClosureCallMode mode, - @Nullable Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode> nodes, - boolean sys) { + public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(GridClosureCallMode mode, + Collection<? extends Callable<R>> jobs, + @Nullable Collection<ClusterNode> nodes, + boolean sys) + { assert mode != null; + assert !F.isEmpty(jobs); enterBusy(); try { - if (F.isEmpty(jobs)) - return new GridFinishedFuture<>(ctx); - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + return ComputeTaskInternalFuture.finishedFuture(ctx, T6.class, U.emptyTopologyException()); ctx.task().setThreadContext(TC_SUBGRID, nodes); @@ -390,7 +393,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param <R> Type. * @return Grid future for collection of closure results. */ - public <R> IgniteInternalFuture<R> callAsync(GridClosureCallMode mode, + public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode, @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes) { return callAsync(mode, job, nodes, false); } @@ -402,13 +405,13 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Job future. */ - public <R> IgniteInternalFuture<R> affinityCall(@Nullable String cacheName, Object affKey, Callable<R> job, + public <R> ComputeTaskInternalFuture<R> affinityCall(@Nullable String cacheName, Object affKey, Callable<R> job, @Nullable Collection<ClusterNode> nodes) { enterBusy(); try { if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException()); // In case cache key is passed instead of affinity key. final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); @@ -418,7 +421,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T5<>(cacheName, affKey0, job), null, false); } catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx, e); + return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e); } finally { leaveBusy(); @@ -432,13 +435,13 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Job future. */ - public IgniteInternalFuture<?> affinityRun(@Nullable String cacheName, Object affKey, Runnable job, + public ComputeTaskInternalFuture<?> affinityRun(@Nullable String cacheName, Object affKey, Runnable job, @Nullable Collection<ClusterNode> nodes) { enterBusy(); try { if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException()); // In case cache key is passed instead of affinity key. final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); @@ -448,7 +451,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T4(cacheName, affKey0, job), null, false); } catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx, e); + return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e); } finally { leaveBusy(); @@ -526,18 +529,19 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param <R> Type. * @return Grid future for collection of closure results. */ - public <R> IgniteInternalFuture<R> callAsync(GridClosureCallMode mode, - @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes, boolean sys) { + public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode, + Callable<R> job, + @Nullable Collection<ClusterNode> nodes, + boolean sys) + { assert mode != null; + assert job != null; enterBusy(); try { - if (job == null) - return new GridFinishedFuture<>(ctx); - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + return ComputeTaskInternalFuture.finishedFuture(ctx, T7.class, U.emptyTopologyException()); ctx.task().setThreadContext(TC_SUBGRID, nodes); @@ -554,13 +558,13 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Grid future for execution result. */ - public <T, R> IgniteInternalFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable T arg, + public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable T arg, @Nullable Collection<ClusterNode> nodes) { enterBusy(); try { if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + return ComputeTaskInternalFuture.finishedFuture(ctx, T8.class, U.emptyTopologyException()); ctx.task().setThreadContext(TC_SUBGRID, nodes); @@ -624,13 +628,15 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Grid future for execution result. */ - public <T, R> IgniteInternalFuture<Collection<R>> callAsync(IgniteClosure<T, R> job, @Nullable Collection<? extends T> args, - @Nullable Collection<ClusterNode> nodes) { + public <T, R> ComputeTaskInternalFuture<Collection<R>> callAsync(IgniteClosure<T, R> job, + @Nullable Collection<? extends T> args, + @Nullable Collection<ClusterNode> nodes) + { enterBusy(); try { if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + return ComputeTaskInternalFuture.finishedFuture(ctx, T9.class, U.emptyTopologyException()); ctx.task().setThreadContext(TC_SUBGRID, nodes); @@ -648,13 +654,13 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Grid future for execution result. */ - public <T, R1, R2> IgniteInternalFuture<R2> callAsync(IgniteClosure<T, R1> job, + public <T, R1, R2> ComputeTaskInternalFuture<R2> callAsync(IgniteClosure<T, R1> job, Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) { enterBusy(); try { if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + return ComputeTaskInternalFuture.finishedFuture(ctx, T10.class, U.emptyTopologyException()); ctx.task().setThreadContext(TC_SUBGRID, nodes); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java index 3aa9f4d..6af2d2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java @@ -30,4 +30,11 @@ public class IgniteFinishedFutureImpl<V> extends IgniteFutureImpl<V> { public IgniteFinishedFutureImpl(GridKernalContext ctx, Throwable err) { super(new GridFinishedFuture<V>(ctx, err)); } + + /** + * @param ctx Context. + */ + public IgniteFinishedFutureImpl(GridKernalContext ctx) { + super(new GridFinishedFuture<>(ctx, (V)null)); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java b/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java index 5147b3d..2cb0ee9 100644 --- a/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java +++ b/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java @@ -212,7 +212,7 @@ public class GridTestTaskSession implements ComputeTaskSession { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> mapFuture() { + @Override public IgniteFuture<?> mapFuture() { assert false : "Not implemented"; return null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java index 50cff92..559b31c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java @@ -78,9 +78,13 @@ public class GridSpiExceptionSelfTest extends GridCommonAbstractTest { assert false : "Exception should be thrown"; } catch (IgniteException e) { - assert e.getCause() instanceof GridTestSpiException : "Wrong cause exception type. " + e; + assertTrue(e.getCause() instanceof IgniteCheckedException); - assert e.getCause().getMessage().startsWith(TEST_MSG) : "Wrong exception message." + e.getMessage(); + Throwable err = e.getCause().getCause(); + + assert err instanceof GridTestSpiException : "Wrong cause exception type. " + e; + + assert err.getMessage().startsWith(TEST_MSG) : "Wrong exception message." + e.getMessage(); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java new file mode 100644 index 0000000..bdbdd86 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testAsync() throws Exception { + ClusterGroup empty = ignite(0).cluster().forNodeId(UUID.randomUUID()); + + assertEquals(0, empty.nodes().size()); + + IgniteCompute comp = ignite(0).compute(empty).withAsync(); + + comp.affinityRun(null, 1, new FailRunnable()); + + checkFutureFails(comp); + + comp.apply(new FailClosure(), new Object()); + + checkFutureFails(comp); + + comp.affinityCall(null, 1, new FailCallable()); + + checkFutureFails(comp); + + comp.broadcast(new FailCallable()); + + checkFutureFails(comp); + } + + /** + * @throws Exception If failed. + */ + public void testSync() throws Exception { + ClusterGroup empty = ignite(0).cluster().forNodeId(UUID.randomUUID()); + + assertEquals(0, empty.nodes().size()); + + final IgniteCompute comp = ignite(0).compute(empty); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override + public Void call() throws Exception { + comp.affinityRun(null, 1, new FailRunnable()); + + return null; + } + }, ClusterGroupEmptyException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + comp.apply(new FailClosure(), new Object()); + + return null; + } + }, ClusterGroupEmptyException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override + public Void call() throws Exception { + comp.affinityCall(null, 1, new FailCallable()); + + return null; + } + }, ClusterGroupEmptyException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + comp.broadcast(new FailCallable()); + + return null; + } + }, ClusterGroupEmptyException.class, null); + } + + /** + * @param comp Compute. + */ + private void checkFutureFails(IgniteCompute comp) { + final ComputeTaskFuture fut = comp.future(); + + assertNotNull(fut); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + fut.get(); + + return null; + } + }, ClusterGroupEmptyException.class, null); + } + + /** + * + */ + private static class FailClosure implements IgniteClosure<Object, Object> { + /** {@inheritDoc} */ + @Override public Object apply(Object o) { + fail(); + + return null; + } + } + + /** + * + */ + private static class FailRunnable implements Runnable { + /** {@inheritDoc} */ + @Override public void run() { + fail(); + } + } + + /** + * + */ + private static class FailCallable implements Callable<Object> { + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + fail(); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java b/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java index fc2cd42..d15b048 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java @@ -199,7 +199,7 @@ public class GridTestCollisionTaskSession implements ComputeTaskSession { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> mapFuture() { + @Override public IgniteFuture<?> mapFuture() { assert false : "Not implemented"; return null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java index 0e83e66..efe7e5f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java @@ -92,6 +92,7 @@ public class IgniteComputeGridTestSuite { suite.addTestSuite(GridMultinodeRedeploySharedModeSelfTest.class); suite.addTestSuite(GridMultinodeRedeployPrivateModeSelfTest.class); suite.addTestSuite(GridMultinodeRedeployIsolatedModeSelfTest.class); + suite.addTestSuite(IgniteComputeEmptyClusterGroupTest.class); return suite; }