http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTaskWithPredicateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTaskWithPredicateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTaskWithPredicateSelfTest.java new file mode 100644 index 0000000..d41c325 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTaskWithPredicateSelfTest.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.failover.*; +import org.apache.ignite.spi.failover.always.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Test failover of a task with Node filter predicate. + */ +@GridCommonTest(group = "Kernal Self") +public class GridFailoverTaskWithPredicateSelfTest extends GridCommonAbstractTest { + /** First node's name. */ + private static final String NODE1 = "NODE1"; + + /** Second node's name. */ + private static final String NODE2 = "NODE2"; + + /** Third node's name. */ + private static final String NODE3 = "NODE3"; + + /** Predicate to exclude the second node from topology */ + private final IgnitePredicate<ClusterNode> p = new IgnitePredicate<ClusterNode>() { + @Override + public boolean apply(ClusterNode e) { + return !NODE2.equals(e.attribute(GridNodeAttributes.ATTR_GRID_NAME)); + } + }; + + /** Whether delegating fail over node was found or not. */ + private final AtomicBoolean routed = new AtomicBoolean(); + + /** Whether job execution failed with exception. */ + private final AtomicBoolean failed = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setFailoverSpi(new AlwaysFailoverSpi() { + /** {@inheritDoc} */ + @Override public ClusterNode failover(FailoverContext ctx, List<ClusterNode> grid) { + ClusterNode failoverNode = super.failover(ctx, grid); + + if (failoverNode != null) + routed.set(true); + else + routed.set(false); + + return failoverNode; + } + }); + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + return cfg; + } + + /** + * Tests that failover doesn't happen on two-node grid when the Task is applicable only for the first node + * and fails on it. + * + * @throws Exception If failed. + */ + public void testJobNotFailedOver() throws Exception { + failed.set(false); + routed.set(false); + + try { + Ignite ignite1 = startGrid(NODE1); + Ignite ignite2 = startGrid(NODE2); + + assert ignite1 != null; + assert ignite2 != null; + + compute(ignite1.cluster().forPredicate(p)).withTimeout(10000).execute(JobFailTask.class.getName(), "1"); + } + catch (ClusterTopologyException ignored) { + failed.set(true); + } + finally { + assertTrue(failed.get()); + assertFalse(routed.get()); + + stopGrid(NODE1); + stopGrid(NODE2); + } + } + + /** + * Tests that failover happens on three-node grid when the Task is applicable for the first node + * and fails on it, but is also applicable on another node. + * + * @throws Exception If failed. + */ + public void testJobFailedOver() throws Exception { + failed.set(false); + routed.set(false); + + try { + Ignite ignite1 = startGrid(NODE1); + Ignite ignite2 = startGrid(NODE2); + Ignite ignite3 = startGrid(NODE3); + + assert ignite1 != null; + assert ignite2 != null; + assert ignite3 != null; + + Integer res = (Integer)compute(ignite1.cluster().forPredicate(p)).withTimeout(10000). + execute(JobFailTask.class.getName(), "1"); + + assert res == 1; + } + catch (ClusterTopologyException ignored) { + failed.set(true); + } + finally { + assertFalse(failed.get()); + assertTrue(routed.get()); + + stopGrid(NODE1); + stopGrid(NODE2); + stopGrid(NODE3); + } + } + + /** + * Tests that in case of failover our predicate is intersected with projection + * (logical AND is performed). + * + * @throws Exception If error happens. + */ + public void testJobNotFailedOverWithStaticProjection() throws Exception { + failed.set(false); + routed.set(false); + + try { + Ignite ignite1 = startGrid(NODE1); + Ignite ignite2 = startGrid(NODE2); + Ignite ignite3 = startGrid(NODE3); + + assert ignite1 != null; + assert ignite2 != null; + assert ignite3 != null; + + // Get projection only for first 2 nodes. + ClusterGroup nodes = ignite1.cluster().forNodeIds(Arrays.asList( + ignite1.cluster().localNode().id(), + ignite2.cluster().localNode().id())); + + // On failover NODE3 shouldn't be taken into account. + Integer res = (Integer)compute(nodes.forPredicate(p)).withTimeout(10000). + execute(JobFailTask.class.getName(), "1"); + + assert res == 1; + } + catch (ClusterTopologyException ignored) { + failed.set(true); + } + finally { + assertTrue(failed.get()); + assertFalse(routed.get()); + + stopGrid(NODE1); + stopGrid(NODE2); + stopGrid(NODE3); + } + } + + /** */ + @ComputeTaskSessionFullSupport + private static class JobFailTask implements ComputeTask<String, Object> { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) throws IgniteCheckedException { + ses.setAttribute("fail", true); + + return Collections.singletonMap(new ComputeJobAdapter(arg) { + /** {@inheritDoc} */ + @SuppressWarnings({"RedundantTypeArguments"}) + @Override + public Serializable execute() throws IgniteCheckedException { + boolean fail; + + try { + fail = ses.<String, Boolean>waitForAttribute("fail", 0); + } + catch (InterruptedException e) { + throw new IgniteCheckedException("Got interrupted while waiting for attribute to be set.", e); + } + + if (fail) { + ses.setAttribute("fail", false); + + throw new IgniteCheckedException("Job exception."); + } + + // This job does not return any result. + return Integer.parseInt(this.<String>argument(0)); + } + }, subgrid.get(0)); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received) + throws IgniteCheckedException { + if (res.getException() != null && !(res.getException() instanceof ComputeUserUndeclaredException)) + return ComputeJobResultPolicy.FAILOVER; + + return ComputeJobResultPolicy.REDUCE; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == 1; + + return results.get(0).getData(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTopologySelfTest.java new file mode 100644 index 0000000..3a99f4e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTopologySelfTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.failover.*; +import org.apache.ignite.spi.failover.always.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Test failover and topology. It don't pick local node if it has been excluded from topology. + */ +@GridCommonTest(group = "Kernal Self") +public class GridFailoverTopologySelfTest extends GridCommonAbstractTest { + /** */ + private final AtomicBoolean failed = new AtomicBoolean(false); + + /** */ + public GridFailoverTopologySelfTest() { + super(/*start Grid*/false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setNodeId(null); + + cfg.setFailoverSpi(new AlwaysFailoverSpi() { + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public ClusterNode failover(FailoverContext ctx, List<ClusterNode> grid) { + if (grid.size() != 1) { + failed.set(true); + + error("Unexpected grid size [expected=1, grid=" + grid + ']'); + } + + UUID locNodeId = ignite.configuration().getNodeId(); + + for (ClusterNode node : grid) { + if (node.id().equals(locNodeId)) { + failed.set(true); + + error("Grid shouldn't contain local node [localNodeId=" + locNodeId + ", grid=" + grid + ']'); + } + } + + return super.failover(ctx, grid); + } + }); + + return cfg; + } + + /** + * Tests that failover don't pick local node if it has been excluded from topology. + * + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testFailoverTopology() throws Exception { + try { + Ignite ignite1 = startGrid(1); + + startGrid(2); + + ignite1.compute().localDeployTask(JobFailTask.class, JobFailTask.class.getClassLoader()); + + try { + compute(ignite1.cluster().forRemotes()).execute(JobFailTask.class, null); + } + catch (IgniteCheckedException e) { + info("Got expected grid exception: " + e); + } + + assert !failed.get(); + } + finally { + stopGrid(1); + stopGrid(2); + } + } + + /** */ + private static class JobFailTask implements ComputeTask<String, Object> { + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private boolean jobFailedOver; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) throws IgniteCheckedException { + assert ignite != null; + + UUID locNodeId = ignite.configuration().getNodeId(); + + assert locNodeId != null; + + ClusterNode remoteNode = null; + + for (ClusterNode node : subgrid) { + if (!node.id().equals(locNodeId)) + remoteNode = node; + } + + return Collections.singletonMap(new ComputeJobAdapter(arg) { + @Override public Serializable execute() throws IgniteCheckedException { + throw new IgniteCheckedException("Job exception."); + } + }, remoteNode); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received) throws IgniteCheckedException { + if (res.getException() != null && !jobFailedOver) { + jobFailedOver = true; + + return ComputeJobResultPolicy.FAILOVER; + } + + return ComputeJobResultPolicy.REDUCE; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == 1; + + return results.get(0).getData(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridHomePathSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridHomePathSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridHomePathSelfTest.java new file mode 100644 index 0000000..f3cf621 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridHomePathSelfTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import static org.apache.ignite.IgniteSystemProperties.*; + +/** + * + */ +public class GridHomePathSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost(getTestResources().getLocalHost()); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testHomeOverride() throws Exception { + try { + startGrid(0); + + // Test home override. + IgniteConfiguration c = getConfiguration(getTestGridName(1)); + + c.setGridGainHome("/new/path"); + + try { + G.start(c); + + assert false : "Exception should have been thrown."; + } + catch (Exception e) { + if (X.hasCause(e, IgniteException.class)) + info("Caught expected exception: " + e); + else + throw e; + } + + // Test no override. + IgniteConfiguration c1 = getConfiguration(getTestGridName(1)); + + c1.setGridGainHome(System.getProperty(GG_HOME)); + + G.start(c1); + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridJobCheckpointCleanupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobCheckpointCleanupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobCheckpointCleanupSelfTest.java new file mode 100644 index 0000000..ea2d4bc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobCheckpointCleanupSelfTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.checkpoint.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Test for checkpoint cleanup. + */ +public class GridJobCheckpointCleanupSelfTest extends GridCommonAbstractTest { + /** Number of currently alive checkpoints. */ + private final AtomicInteger cntr = new AtomicInteger(); + + /** Checkpoint. */ + private CheckpointSpi checkpointSpi; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setCheckpointSpi(checkpointSpi); + + return c; + } + + /** + * Spawns one job on the node other than task node and + * ensures that all checkpoints were removed after task completion. + * + * @throws Exception if failed. + */ + public void testCheckpointCleanup() throws Exception { + try { + checkpointSpi = new TestCheckpointSpi("task-checkpoints", cntr); + + Ignite taskIgnite = startGrid(0); + + checkpointSpi = new TestCheckpointSpi("job-checkpoints", cntr); + + Ignite jobIgnite = startGrid(1); + + taskIgnite.compute().execute(new CheckpointCountingTestTask(), jobIgnite.cluster().localNode()); + } + finally { + stopAllGrids(); + } + + assertEquals(cntr.get(), 0); + } + + /** + * Test checkpoint SPI. + */ + @IgniteSpiMultipleInstancesSupport(true) + private static class TestCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi { + /** Counter. */ + private final AtomicInteger cntr; + + /** + * @param name Name. + * @param cntr Counter. + */ + TestCheckpointSpi(String name, AtomicInteger cntr) { + setName(name); + + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public byte[] loadCheckpoint(String key) throws IgniteSpiException { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) + throws IgniteSpiException { + cntr.incrementAndGet(); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean removeCheckpoint(String key) { + cntr.decrementAndGet(); + + return true; + } + + /** {@inheritDoc} */ + @Override public void setCheckpointListener(CheckpointListener lsnr) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + // No-op. + } + } + + /** + * + */ + @ComputeTaskSessionFullSupport + private static class CheckpointCountingTestTask extends ComputeTaskAdapter<ClusterNode, Object> { + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable ClusterNode arg) + throws IgniteCheckedException { + for (ClusterNode node : subgrid) { + if (node.id().equals(arg.id())) + return Collections.singletonMap(new ComputeJobAdapter() { + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + @Nullable @Override public Object execute() throws IgniteCheckedException { + ses.saveCheckpoint("checkpoint-key", "checkpoint-value"); + + return null; + } + }, node); + } + + assert false : "Expected node wasn't found in grid"; + + // Never accessible. + return null; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridJobCollisionCancelSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobCollisionCancelSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobCollisionCancelSelfTest.java new file mode 100644 index 0000000..bf967a6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobCollisionCancelSelfTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.collision.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +/** + * Job collision cancel test. + */ +@SuppressWarnings( {"PublicInnerClass"}) +@GridCommonTest(group = "Kernal Self") +public class GridJobCollisionCancelSelfTest extends GridCommonAbstractTest { + /** */ + private static final Object mux = new Object(); + + /** */ + private static final int SPLIT_COUNT = 2; + + /** */ + private static final long maxJobExecTime = 10000; + + /** */ + private static int cancelCnt; + + /** */ + private static int execCnt; + + /** */ + private static int colResolutionCnt; + + /** */ + public GridJobCollisionCancelSelfTest() { + super(true); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings( {"AssignmentToCatchBlockParameter"}) + public void testCancel() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + ignite.compute().localDeployTask(GridCancelTestTask.class, GridCancelTestTask.class.getClassLoader()); + + ComputeTaskFuture<?> res0 = + executeAsync(ignite.compute().withTimeout(maxJobExecTime * 2), GridCancelTestTask.class.getName(), null); + + try { + Object res = res0.get(); + + info("Cancel test result: " + res); + + synchronized (mux) { + // Every execute must be called. + assert execCnt <= SPLIT_COUNT : "Invalid execute count: " + execCnt; + + // Job returns 1 if was cancelled. + assert (Integer)res <= SPLIT_COUNT : "Invalid task result: " + res; + + // Should be exactly the same as Jobs number. + assert cancelCnt <= SPLIT_COUNT : "Invalid cancel count: " + cancelCnt; + + // One per start and one per stop and some that come with heartbeats. + assert colResolutionCnt > SPLIT_COUNT + 1: + "Invalid collision resolution count: " + colResolutionCnt; + } + } + catch (ComputeTaskTimeoutException e) { + error("Task execution got timed out.", e); + } + catch (Exception e) { + assert e.getCause() != null; + + if (e.getCause() instanceof IgniteCheckedException) + e = (Exception)e.getCause(); + + if (e.getCause() instanceof IOException) + e = (Exception)e.getCause(); + + assert e.getCause() instanceof InterruptedException : "Invalid exception cause: " + e.getCause(); + } + } + + /** + * @return Configuration. + * @throws Exception If failed. + */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + cfg.setCollisionSpi(new GridTestCollision()); + + return cfg; + } + + /** + * + */ + public static class GridCancelTestTask extends ComputeTaskSplitAdapter<Serializable, Object> { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public Collection<? extends ComputeJob> split(int gridSize, Serializable arg) { + if (log.isInfoEnabled()) + log.info("Splitting task [task=" + this + ", gridSize=" + gridSize + ", arg=" + arg + ']'); + + Collection<GridCancelTestJob> jobs = new ArrayList<>(SPLIT_COUNT); + + for (int i = 0; i < SPLIT_COUNT; i++) + jobs.add(new GridCancelTestJob()); + + return jobs; + } + + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) { + if (log.isInfoEnabled()) + log.info("Aggregating job [job=" + this + ", results=" + results + ']'); + + int res = 0; + + for (ComputeJobResult result : results) { + assert result != null; + + if (result.getData() != null) + res += (Integer)result.getData(); + } + + return res; + } + } + + /** + * Test job. + */ + public static class GridCancelTestJob extends ComputeJobAdapter { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** */ + @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"}) + private boolean isCancelled; + + /** */ + private final long thresholdTime; + + /** */ + public GridCancelTestJob() { + thresholdTime = System.currentTimeMillis() + maxJobExecTime; + } + + /** {@inheritDoc} */ + @Override public Serializable execute() { + synchronized (mux) { + execCnt++; + } + + if (log.isInfoEnabled()) + log.info("Executing job: " + jobCtx.getJobId()); + + long now = System.currentTimeMillis(); + + while (!isCancelled && now < thresholdTime) { + synchronized (mux) { + try { + mux.wait(thresholdTime - now); + } + catch (InterruptedException ignored) { + // No-op. + } + } + + now = System.currentTimeMillis(); + } + + synchronized (mux) { + return isCancelled ? 1 : 0; + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + synchronized (mux) { + isCancelled = true; + + cancelCnt++; + + mux.notifyAll(); + } + + log.warning("Job cancelled: " + jobCtx.getJobId()); + } + } + + + /** + * Test collision SPI. + */ + @IgniteSpiMultipleInstancesSupport(true) + public static class GridTestCollision extends IgniteSpiAdapter implements CollisionSpi { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void onCollision(CollisionContext ctx) { + Collection<CollisionJobContext> activeJobs = ctx.activeJobs(); + Collection<CollisionJobContext> waitJobs = ctx.waitingJobs(); + + synchronized (mux) { + colResolutionCnt++; + } + + for (CollisionJobContext job : waitJobs) + job.activate(); + + for (CollisionJobContext job : activeJobs) + job.cancel(); + } + + /** {@inheritDoc} */ + @Override public void spiStart(String gridName) throws IgniteSpiException { + // Start SPI start stopwatch. + startStopwatch(); + + // Ack start. + if (log.isInfoEnabled()) + log.info(startInfo()); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + // Ack stop. + if (log.isInfoEnabled()) + log.info(stopInfo()); + } + + /** {@inheritDoc} */ + @Override public void setExternalCollisionListener(CollisionExternalListener lsnr) { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridJobContextSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobContextSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobContextSelfTest.java new file mode 100644 index 0000000..d0c1a70 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobContextSelfTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +/** + * Job context test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridJobContextSelfTest extends GridCommonAbstractTest { + /** + * @throws Exception If anything failed. + */ + public void testJobContext() throws Exception { + Ignite ignite = startGrid(1); + + try { + startGrid(2); + + try { + ignite.compute().execute(JobContextTask.class, null); + } + finally { + stopGrid(2); + } + } + finally{ + stopGrid(1); + } + } + + /** */ + @SuppressWarnings("PublicInnerClass") + public static class JobContextTask extends ComputeTaskSplitAdapter<Object, Object> { + @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { + Collection<ComputeJobAdapter> jobs = new ArrayList<>(gridSize); + + for (int i = 0; i < gridSize; i++) { + jobs.add(new ComputeJobAdapter() { + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public Serializable execute() { + UUID locNodeId = ignite.configuration().getNodeId(); + + jobCtx.setAttribute("nodeId", locNodeId); + jobCtx.setAttribute("jobId", jobCtx.getJobId()); + + Map<String, String> attrs = new HashMap<>(10); + + for (int i = 0; i < 10; i++) { + String s = jobCtx.getJobId().toString() + i; + + attrs.put(s, s); + } + + jobCtx.setAttributes(attrs); + + assert jobCtx.getAttribute("nodeId").equals(locNodeId); + assert jobCtx.getAttributes().get("nodeId").equals(locNodeId); + assert jobCtx.getAttributes().keySet().containsAll(attrs.keySet()); + assert jobCtx.getAttributes().values().containsAll(attrs.values()); + + return null; + } + }); + } + + return jobs; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + for (ComputeJobResult res : results) { + ComputeJobContext jobCtx = res.getJobContext(); + + assert jobCtx.getAttribute("nodeId").equals(res.getNode().id()); + assert jobCtx.getAttributes().get("nodeId").equals(res.getNode().id()); + + assert jobCtx.getAttribute("jobId").equals(jobCtx.getJobId()); + + for (int i = 0; i < 10; i++) { + String s = jobCtx.getJobId().toString() + i; + + assert jobCtx.getAttribute(s).equals(s); + assert jobCtx.getAttributes().get(s).equals(s); + } + } + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java new file mode 100644 index 0000000..cb51453 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java @@ -0,0 +1,802 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Test behavior of jobs when master node has failed, but job class implements {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} + * interface. + */ +@GridCommonTest(group = "Task Session") +public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest { + /** Total grid count within the cloud. */ + private static final int GRID_CNT = 2; + + /** Default IP finder for single-JVM cloud grid. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Counts how many times master-leave interface implementation was called. */ + private static volatile CountDownLatch invokeLatch; + + /** Latch which blocks job execution until main thread has sent node fail signal. */ + private static volatile CountDownLatch latch; + + /** Latch which blocks main thread until all jobs start their execution. */ + private static volatile CountDownLatch jobLatch; + + /** Should job wait for callback. */ + private static volatile boolean awaitMasterLeaveCallback = true; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + awaitMasterLeaveCallback = true; + latch = new CountDownLatch(1); + jobLatch = new CountDownLatch(GRID_CNT - 1); + invokeLatch = new CountDownLatch(GRID_CNT - 1); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCommunicationSpi(new CommunicationSpi()); + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * Get predicate which allows task execution on all nodes except the last one. + * + * @return Predicate. + */ + private IgnitePredicate<ClusterNode> excludeLastPredicate() { + return new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode e) { + return !e.id().equals(grid(GRID_CNT - 1).localNode().id()); + } + }; + } + + /** + * Constructor. + */ + public GridJobMasterLeaveAwareSelfTest() { + super(/* don't start grid */ false); + } + + /** + * Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked on job which is initiated by + * master and is currently running on it. + * + * @throws Exception If failed. + */ + public void testLocalJobOnMaster() throws Exception { + invokeLatch = new CountDownLatch(1); + jobLatch = new CountDownLatch(1); + + Ignite g = startGrid(0); + + g.compute().enableAsync().execute(new TestTask(1), null); + + jobLatch.await(); + + // Count down the latch in a separate thread. + new Thread(new Runnable() { + @Override public void run() { + try { + U.sleep(500); + } + catch (IgniteInterruptedException ignore) { + // No-op. + } + + latch.countDown(); + } + }).start(); + + stopGrid(0, true); + + latch.countDown(); + + assert invokeLatch.await(5000, MILLISECONDS); + } + + /** + * Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked when master node leaves topology normally. + * + * @throws Exception If failed. + */ + public void testMasterStoppedNormally() throws Exception { + // Start grids. + for (int i = 0; i < GRID_CNT; i++) + startGrid(i); + + int lastGridIdx = GRID_CNT - 1; + + compute(grid(lastGridIdx).forPredicate(excludeLastPredicate())).enableAsync(). + execute(new TestTask(GRID_CNT - 1), null); + + jobLatch.await(); + + stopGrid(lastGridIdx, true); + + latch.countDown(); + + assert invokeLatch.await(5000, MILLISECONDS); + } + + /** + * Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked when master node leaves topology + * abruptly (e.g. due to a network failure or immediate node shutdown). + * + * @throws Exception If failed. + */ + public void testMasterStoppedAbruptly() throws Exception { + // Start grids. + for (int i = 0; i < GRID_CNT; i++) + startGrid(i); + + int lastGridIdx = GRID_CNT - 1; + + compute(grid(lastGridIdx).forPredicate(excludeLastPredicate())).enableAsync(). + execute(new TestTask(GRID_CNT - 1), null); + + jobLatch.await(); + + ((CommunicationSpi)grid(lastGridIdx).configuration().getCommunicationSpi()).blockMessages(); + + stopGrid(lastGridIdx, true); + + latch.countDown(); + + assert invokeLatch.await(5000, MILLISECONDS); + } + + /** + * Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked when fails to send + * {@link GridJobExecuteResponse} to master node. + * + * @throws Exception If failed. + */ + public void testCannotSendJobExecuteResponse() throws Exception { + awaitMasterLeaveCallback = false; + + // Start grids. + for (int i = 0; i < GRID_CNT; i++) + startGrid(i); + + int lastGridIdx = GRID_CNT - 1; + + compute(grid(lastGridIdx).forPredicate(excludeLastPredicate())).enableAsync(). + execute(new TestTask(GRID_CNT - 1), null); + + jobLatch.await(); + + for (int i = 0; i < lastGridIdx; i++) + ((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).waitLatch(); + + latch.countDown(); + + // Ensure that all worker nodes has already started job response sending. + for (int i = 0; i < lastGridIdx; i++) + ((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).awaitResponse(); + + // Now we stop master grid. + stopGrid(lastGridIdx, true); + + // Release communication SPI wait latches. As master node is stopped, job worker will receive and exception. + for (int i = 0; i < lastGridIdx; i++) + ((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).releaseWaitLatch(); + + assert invokeLatch.await(5000, MILLISECONDS); + } + + /** + * @throws Exception If failed. + */ + public void testApply1() throws Exception { + testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { + @Override public IgniteFuture<?> applyx(ClusterGroup grid) throws IgniteCheckedException { + IgniteCompute comp = compute(grid).enableAsync(); + + comp.apply(new TestClosure(), "arg"); + + return comp.future(); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testApply2() throws Exception { + testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() { + @Override public IgniteFuture<?> applyx(ClusterGroup grid) throws IgniteCheckedException { + IgniteCompute comp = compute(grid).enableAsync(); + + comp.apply(new TestClosure(), Arrays.asList("arg1", "arg2")); + + return comp.future(); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testApply3() throws Exception { + testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() { + @Override public IgniteFuture<?> applyx(ClusterGroup grid) throws IgniteCheckedException { + IgniteCompute comp = compute(grid).enableAsync(); + + comp.apply(new TestClosure(), + Arrays.asList("arg1", "arg2"), + new IgniteReducer<Void, Object>() { + @Override public boolean collect(@Nullable Void aVoid) { + return true; + } + + @Override public Object reduce() { + return null; + } + }); + + return comp.future(); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testRun1() throws Exception { + testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { + @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { + IgniteCompute comp = compute(prj).enableAsync(); + + comp.run(new TestRunnable()); + + return comp.future(); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testRun2() throws Exception { + testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() { + @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { + IgniteCompute comp = compute(prj).enableAsync(); + + comp.run(Arrays.asList(new TestRunnable(), new TestRunnable())); + + return comp.future(); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testCall1() throws Exception { + testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { + @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { + IgniteCompute comp = compute(prj).enableAsync(); + + comp.call(new TestCallable()); + + return comp.future(); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testCall2() throws Exception { + testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() { + @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { + IgniteCompute comp = compute(prj).enableAsync(); + + comp.call(Arrays.asList(new TestCallable(), new TestCallable())); + + return comp.future(); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testCall3() throws Exception { + testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() { + @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { + IgniteCompute comp = compute(prj).enableAsync(); + + comp.call( + Arrays.asList(new TestCallable(), new TestCallable()), + new IgniteReducer<Void, Object>() { + @Override public boolean collect(@Nullable Void aVoid) { + return true; + } + + @Override public Object reduce() { + return null; + } + }); + + return comp.future(); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testBroadcast1() throws Exception { + testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { + @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { + IgniteCompute comp = compute(prj).enableAsync(); + + comp.broadcast(new TestRunnable()); + + return comp.future(); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testBroadcast2() throws Exception { + testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { + @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { + IgniteCompute comp = compute(prj).enableAsync(); + + comp.broadcast(new TestCallable()); + + return comp.future(); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testBroadcast3() throws Exception { + testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { + @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { + IgniteCompute comp = compute(prj).enableAsync(); + + comp.broadcast(new TestClosure(), "arg"); + + return comp.future(); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityRun() throws Exception { + testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { + @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { + IgniteCompute comp = compute(prj).enableAsync(); + + GridCacheAffinity<Object> aff = prj.ignite().cache(null).affinity(); + + ClusterNode node = F.first(prj.nodes()); + + comp.affinityRun(null, keyForNode(aff, node), new TestRunnable()); + + return comp.future(); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityCall() throws Exception { + testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { + @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { + IgniteCompute comp = compute(prj).enableAsync(); + + GridCacheAffinity<Object> aff = prj.ignite().cache(null).affinity(); + + ClusterNode node = F.first(prj.nodes()); + + comp.affinityCall(null, keyForNode(aff, node), new TestCallable()); + + return comp.future(); + } + }); + } + + /** + * @param aff Cache affinity. + * @param node Node. + * @return Finds some cache key for which given node is primary. + */ + private Object keyForNode(GridCacheAffinity<Object> aff, ClusterNode node) { + assertNotNull(node); + + Object key = null; + + for (int i = 0; i < 1000; i++) { + if (aff.isPrimary(node, i)) { + key = i; + + break; + } + } + + assertNotNull(key); + + return key; + } + + /** + * @param expJobs Expected jobs number. + * @param taskStarter Task started. + * @throws Exception If failed. + */ + private void testMasterLeaveAwareCallback(int expJobs, IgniteClosure<ClusterGroup, IgniteFuture<?>> taskStarter) + throws Exception { + jobLatch = new CountDownLatch(expJobs); + invokeLatch = new CountDownLatch(expJobs); + + for (int i = 0; i < GRID_CNT; i++) + startGrid(i); + + int lastGridIdx = GRID_CNT - 1; + + IgniteFuture<?> fut = taskStarter.apply(grid(lastGridIdx).forPredicate(excludeLastPredicate())); + + jobLatch.await(); + + stopGrid(lastGridIdx, true); + + latch.countDown(); + + assert invokeLatch.await(5000, MILLISECONDS); + + try { + fut.get(); + } + catch (IgniteCheckedException e) { + log.debug("Task failed: " + e); + } + } + + /** + */ + private static class TestMasterLeaveAware { + /** */ + private final CountDownLatch latch0 = new CountDownLatch(1); + + /** + * @param log Logger. + */ + private void execute(IgniteLogger log) { + try { + log.info("Started execute."); + + // Countdown shared job latch so that the main thread know that all jobs are + // inside the "execute" routine. + jobLatch.countDown(); + + log.info("After job latch."); + + // Await for the main thread to allow jobs to proceed. + latch.await(); + + log.info("After latch."); + + if (awaitMasterLeaveCallback) { + latch0.await(); + + log.info("After latch0."); + } + else + log.info("Latch 0 skipped."); + } + catch (InterruptedException e) { + // We do not expect any interruptions here, hence this statement. + fail("Unexpected exception: " + e); + } + } + + /** + * @param log Logger. + * @param job Actual job. + */ + private void onMasterLeave(IgniteLogger log, Object job) { + log.info("Callback executed: " + job); + + latch0.countDown(); + + invokeLatch.countDown(); + } + } + + /** + * Master leave aware callable. + */ + private static class TestCallable implements Callable<Void>, ComputeJobMasterLeaveAware { + /** Task session. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware(); + + /** {@inheritDoc} */ + @Override public Void call() throws Exception { + masterLeaveAware.execute(log); + + return null; + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + masterLeaveAware.onMasterLeave(log, this); + } + } + + /** + * Master leave aware runnable. + */ + private static class TestRunnable implements Runnable, ComputeJobMasterLeaveAware { + /** Task session. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware(); + + /** {@inheritDoc} */ + @Override public void run() { + masterLeaveAware.execute(log); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + masterLeaveAware.onMasterLeave(log, this); + } + } + + /** + * Master leave aware closure. + */ + private static class TestClosure implements IgniteClosure<String, Void>, ComputeJobMasterLeaveAware { + /** Task session. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware(); + + /** {@inheritDoc} */ + @Override public Void apply(String arg) { + masterLeaveAware.execute(log); + + return null; + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + masterLeaveAware.onMasterLeave(log, this); + } + } + + /** + * Base implementation of dummy task which produces predefined amount of test jobs on split stage. + */ + private static class TestTask extends ComputeTaskSplitAdapter<String, Integer> { + /** How many jobs to produce. */ + private int jobCnt; + + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession taskSes; + + /** + * Constructor. + * + * @param jobCnt How many jobs to produce on split stage. + */ + private TestTask(int jobCnt) { + this.jobCnt = jobCnt; + } + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteCheckedException { + Collection<ComputeJobAdapter> jobs = new ArrayList<>(jobCnt); + + for (int i = 0; i < jobCnt; i++) + jobs.add(new TestJob()); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } + + /** + * Base implementation of dummy test job. + */ + private static class TestJob extends ComputeJobAdapter implements ComputeJobMasterLeaveAware { + /** Task session. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware(); + + /** + * Constructor + */ + private TestJob() { + super(new Object()); + } + + /** {@inheritDoc} */ + @Override public Object execute() throws IgniteCheckedException { + masterLeaveAware.execute(log); + + return null; + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + masterLeaveAware.onMasterLeave(log, this); + } + } + + /** + * Communication SPI which could optionally block outgoing messages. + */ + private static class CommunicationSpi extends TcpCommunicationSpi { + /** Whether to block all outgoing messages. */ + private volatile boolean block; + + /** Job execution response latch. */ + private CountDownLatch respLatch = new CountDownLatch(1); + + /** Whether to wait for a wait latch before returning. */ + private volatile boolean wait; + + /** Wait latch. */ + private CountDownLatch waitLatch = new CountDownLatch(1); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg) + throws IgniteSpiException { + sendMessage0(node, msg); + } + + /** + * Send message optionally either blocking it or throwing an exception if it is of + * {@link GridJobExecuteResponse} type. + * + * @param node Destination node. + * @param msg Message to be sent. + * @throws org.apache.ignite.spi.IgniteSpiException If failed. + */ + private void sendMessage0(ClusterNode node, GridTcpCommunicationMessageAdapter msg) throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + GridIoMessage msg0 = (GridIoMessage)msg; + + if (msg0.message() instanceof GridJobExecuteResponse) { + respLatch.countDown(); + + if (wait) { + try { + U.await(waitLatch); + } + catch (IgniteInterruptedException ignore) { + // No-op. + } + } + } + } + + if (!block) + super.sendMessage(node, msg); + } + + /** + * Block all outgoing message. + */ + void blockMessages() { + block = true; + } + + /** + * Whether to block on a wait latch. + */ + private void waitLatch() { + wait = true; + } + + /** + * Count down wait latch. + */ + private void releaseWaitLatch() { + waitLatch.countDown(); + } + + /** + * Await for job execution response to come. + * + * @throws org.apache.ignite.IgniteInterruptedException If interrupted. + */ + private void awaitResponse() throws IgniteInterruptedException { + U.await(respLatch); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java new file mode 100644 index 0000000..d918ccb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.collision.jobstealing.*; +import org.apache.ignite.spi.failover.jobstealing.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.config.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Job stealing test. + */ +@SuppressWarnings("unchecked") +@GridCommonTest(group = "Kernal Self") +public class GridJobStealingSelfTest extends GridCommonAbstractTest { + /** Task execution timeout in milliseconds. */ + private static final int TASK_EXEC_TIMEOUT_MS = 50000; + + /** */ + private Ignite ignite1; + + /** */ + private Ignite ignite2; + + /** Job distribution map. Records which job has run on which node. */ + private static Map<UUID, Collection<ComputeJob>> jobDistrMap = new HashMap<>(); + + /** */ + public GridJobStealingSelfTest() { + super(false /* don't start grid*/); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + jobDistrMap.clear(); + + ignite1 = startGrid(1); + + ignite2 = startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + ignite1 = null; + ignite2 = null; + } + + /** + * Test 2 jobs on 1 node. + * + * @throws IgniteCheckedException If test failed. + */ + public void testTwoJobs() throws IgniteCheckedException { + executeAsync(ignite1.compute(), new JobStealingSingleNodeTask(2), null).get(TASK_EXEC_TIMEOUT_MS); + + // Verify that 1 job was stolen by second node. + assertEquals(2, jobDistrMap.keySet().size()); + assertEquals(1, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); + assertEquals(1, jobDistrMap.get(ignite2.cluster().localNode().id()).size()); + } + + /** + * Test 2 jobs on 1 node with null predicate. + * + * @throws IgniteCheckedException If test failed. + */ + @SuppressWarnings("NullArgumentToVariableArgMethod") + public void testTwoJobsNullPredicate() throws IgniteCheckedException { + executeAsync(ignite1.compute(), new JobStealingSingleNodeTask(2), null).get(TASK_EXEC_TIMEOUT_MS); + + // Verify that 1 job was stolen by second node. + assertEquals(2, jobDistrMap.keySet().size()); + assertEquals(1, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); + assertEquals(1, jobDistrMap.get(ignite2.cluster().localNode().id()).size()); + } + + /** + * Test 2 jobs on 1 node with null predicate using string task name. + * + * @throws IgniteCheckedException If test failed. + */ + @SuppressWarnings("NullArgumentToVariableArgMethod") + public void testTwoJobsTaskNameNullPredicate() throws IgniteCheckedException { + executeAsync(ignite1.compute(), JobStealingSingleNodeTask.class.getName(), null).get(TASK_EXEC_TIMEOUT_MS); + + // Verify that 1 job was stolen by second node. + assertEquals(2, jobDistrMap.keySet().size()); + assertEquals(1, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); + assertEquals(1, jobDistrMap.get(ignite2.cluster().localNode().id()).size()); + } + + /** + * Test 2 jobs on 1 node when one of the predicates is null. + * + * @throws IgniteCheckedException If test failed. + */ + @SuppressWarnings("unchecked") + public void testTwoJobsPartiallyNullPredicate() throws IgniteCheckedException { + IgnitePredicate<ClusterNode> topPred = new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode e) { + return ignite2.cluster().localNode().id().equals(e.id()); // Limit projection with only grid2. + } + }; + + executeAsync(compute(ignite1.cluster().forPredicate(topPred)).withTimeout(TASK_EXEC_TIMEOUT_MS), + new JobStealingSpreadTask(2), null).get(TASK_EXEC_TIMEOUT_MS); + + assertEquals(1, jobDistrMap.keySet().size()); + assertEquals(2, jobDistrMap.get(ignite2.cluster().localNode().id()).size()); + assertFalse(jobDistrMap.containsKey(ignite1.cluster().localNode().id())); + } + + /** + * Tests that projection predicate is taken into account by Stealing SPI. + * + * @throws Exception If failed. + */ + public void testProjectionPredicate() throws Exception { + final Ignite ignite3 = startGrid(3); + + executeAsync(compute(ignite1.cluster().forPredicate(new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode e) { + return ignite1.cluster().localNode().id().equals(e.id()) || + ignite3.cluster().localNode().id().equals(e.id()); // Limit projection with only grid1 or grid3 node. + } + })), new JobStealingSpreadTask(4), null).get(TASK_EXEC_TIMEOUT_MS); + + // Verify that jobs were run only on grid1 and grid3 (not on grid2) + assertEquals(2, jobDistrMap.keySet().size()); + assertEquals(2, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); + assertEquals(2, jobDistrMap.get(ignite3.cluster().localNode().id()).size()); + assertFalse(jobDistrMap.containsKey(ignite2.cluster().localNode().id())); + } + + /** + * Tests that projection predicate is taken into account by Stealing SPI, + * and that jobs in projection can steal tasks from each other. + * + * @throws Exception If failed. + */ + public void testProjectionPredicateInternalStealing() throws Exception { + final Ignite ignite3 = startGrid(3); + + IgnitePredicate<ClusterNode> p = new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode e) { + return ignite1.cluster().localNode().id().equals(e.id()) || + ignite3.cluster().localNode().id().equals(e.id()); // Limit projection with only grid1 or grid3 node. + } + }; + + executeAsync(compute(ignite1.cluster().forPredicate(p)), new JobStealingSingleNodeTask(4), null).get(TASK_EXEC_TIMEOUT_MS); + + // Verify that jobs were run only on grid1 and grid3 (not on grid2) + assertEquals(2, jobDistrMap.keySet().size()); + assertFalse(jobDistrMap.containsKey(ignite2.cluster().localNode().id())); + } + + /** + * Tests that a job is not cancelled if there are no + * available thief nodes in topology. + * + * @throws Exception If failed. + */ + public void testSingleNodeTopology() throws Exception { + IgnitePredicate<ClusterNode> p = new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode e) { + return ignite1.cluster().localNode().id().equals(e.id()); // Limit projection with only grid1 node. + } + }; + + executeAsync(compute(ignite1.cluster().forPredicate(p)), new JobStealingSpreadTask(2), null). + get(TASK_EXEC_TIMEOUT_MS); + + assertEquals(1, jobDistrMap.keySet().size()); + assertEquals(2, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); + } + + /** + * Tests that a job is not cancelled if there are no + * available thief nodes in projection. + * + * @throws Exception If failed. + */ + public void testSingleNodeProjection() throws Exception { + ClusterGroup prj = ignite1.cluster().forNodeIds(Collections.singleton(ignite1.cluster().localNode().id())); + + executeAsync(compute(prj), new JobStealingSpreadTask(2), null).get(TASK_EXEC_TIMEOUT_MS); + + assertEquals(1, jobDistrMap.keySet().size()); + assertEquals(2, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); + } + + /** + * Tests that a job is not cancelled if there are no + * available thief nodes in projection. Uses null predicate. + * + * @throws Exception If failed. + */ + @SuppressWarnings("NullArgumentToVariableArgMethod") + public void testSingleNodeProjectionNullPredicate() throws Exception { + ClusterGroup prj = ignite1.cluster().forNodeIds(Collections.singleton(ignite1.cluster().localNode().id())); + + executeAsync(compute(prj).withTimeout(TASK_EXEC_TIMEOUT_MS), new JobStealingSpreadTask(2), null). + get(TASK_EXEC_TIMEOUT_MS); + + assertEquals(1, jobDistrMap.keySet().size()); + assertEquals(2, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); + } + + /** + * Tests job stealing with peer deployment and different class loaders. + * + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testProjectionPredicateDifferentClassLoaders() throws Exception { + final Ignite ignite3 = startGrid(3); + + URL[] clsLdrUrls; + try { + clsLdrUrls = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))}; + } + catch (MalformedURLException e) { + throw new RuntimeException("Define property p2p.uri.cls", e); + } + + ClassLoader ldr1 = new URLClassLoader(clsLdrUrls, getClass().getClassLoader()); + + Class taskCls = ldr1.loadClass("org.gridgain.grid.tests.p2p.JobStealingTask"); + Class nodeFilterCls = ldr1.loadClass("org.gridgain.grid.tests.p2p.GridExcludeNodeFilter"); + + IgnitePredicate<ClusterNode> nodeFilter = (IgnitePredicate<ClusterNode>)nodeFilterCls + .getConstructor(UUID.class).newInstance(ignite2.cluster().localNode().id()); + + Map<UUID, Integer> ret = (Map<UUID, Integer>)executeAsync(compute(ignite1.cluster().forPredicate(nodeFilter)), + taskCls, null).get(TASK_EXEC_TIMEOUT_MS); + + assert ret != null; + assert ret.get(ignite1.cluster().localNode().id()) != null && ret.get(ignite1.cluster().localNode().id()) == 2 : + ret.get(ignite1.cluster().localNode().id()); + assert ret.get(ignite3.cluster().localNode().id()) != null && ret.get(ignite3.cluster().localNode().id()) == 2 : + ret.get(ignite3.cluster().localNode().id()); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + JobStealingCollisionSpi colSpi = new JobStealingCollisionSpi(); + + // One job at a time. + colSpi.setActiveJobsThreshold(1); + colSpi.setWaitJobsThreshold(0); + + JobStealingFailoverSpi failSpi = new JobStealingFailoverSpi(); + + // Verify defaults. + assert failSpi.getMaximumFailoverAttempts() == JobStealingFailoverSpi.DFLT_MAX_FAILOVER_ATTEMPTS; + + cfg.setCollisionSpi(colSpi); + cfg.setFailoverSpi(failSpi); + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + return cfg; + } + + /** + * Job stealing task, that spreads jobs equally over the grid. + */ + private static class JobStealingSpreadTask extends ComputeTaskAdapter<Object, Object> { + /** Grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Number of jobs to spawn from task. */ + protected final int nJobs; + + /** + * Constructs a new task instance. + * + * @param nJobs Number of jobs to spawn from this task. + */ + JobStealingSpreadTask(int nJobs) { + this.nJobs = nJobs; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteCheckedException { + //assert subgrid.size() == 2 : "Invalid subgrid size: " + subgrid.size(); + + Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size()); + + Iterator<ClusterNode> subIter = subgrid.iterator(); + + // Spread jobs over subgrid. + for (int i = 0; i < nJobs; i++) { + if (!subIter.hasNext()) + subIter = subgrid.iterator(); // wrap around + + map.put(new GridJobStealingJob(5000L), subIter.next()); + } + + return map; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SuspiciousMethodCalls") + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + for (ComputeJobResult res : results) { + log.info("Job result: " + res.getData()); + } + + return null; + } + } + + /** + * Job stealing task, that puts all jobs onto one node. + */ + private static class JobStealingSingleNodeTask extends JobStealingSpreadTask { + /** {@inheritDoc} */ + JobStealingSingleNodeTask(int nJobs) { + super(nJobs); + } + + /** + * Default constructor. + * + * Uses 2 jobs. + */ + JobStealingSingleNodeTask() { + super(2); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteCheckedException { + assert subgrid.size() > 1 : "Invalid subgrid size: " + subgrid.size(); + + Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size()); + + // Put all jobs onto one node. + for (int i = 0; i < nJobs; i++) + map.put(new GridJobStealingJob(5000L), subgrid.get(0)); + + return map; + } + } + + /** + * Job stealing job. + */ + private static final class GridJobStealingJob extends ComputeJobAdapter { + /** Injected grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** + * @param arg Job argument. + */ + GridJobStealingJob(Long arg) { + super(arg); + } + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + log.info("Started job on node: " + ignite.cluster().localNode().id()); + + if (!jobDistrMap.containsKey(ignite.cluster().localNode().id())) { + Collection<ComputeJob> jobs = new ArrayList<>(); + jobs.add(this); + + jobDistrMap.put(ignite.cluster().localNode().id(), jobs); + } + else + jobDistrMap.get(ignite.cluster().localNode().id()).add(this); + + try { + Long sleep = argument(0); + + assert sleep != null; + + Thread.sleep(sleep); + } + catch (InterruptedException e) { + log.info("Job got interrupted on node: " + ignite.cluster().localNode().id()); + + throw new IgniteCheckedException("Job got interrupted.", e); + } + finally { + log.info("Job finished on node: " + ignite.cluster().localNode().id()); + } + + return ignite.cluster().localNode().id(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingZeroActiveJobsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingZeroActiveJobsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingZeroActiveJobsSelfTest.java new file mode 100644 index 0000000..de0d669 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingZeroActiveJobsSelfTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.collision.jobstealing.*; +import org.apache.ignite.spi.failover.jobstealing.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Job stealing test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridJobStealingZeroActiveJobsSelfTest extends GridCommonAbstractTest { + /** */ + private static Ignite ignite1; + + /** */ + private static Ignite ignite2; + + /** */ + public GridJobStealingZeroActiveJobsSelfTest() { + super(false /* don't start grid*/); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ignite1 = startGrid(1); + ignite2 = startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + ignite1 = null; + + stopGrid(1); + stopGrid(2); + } + + /** + * Test 2 jobs on 2 nodes. + * + * @throws IgniteCheckedException If test failed. + */ + public void testTwoJobs() throws IgniteCheckedException { + ignite1.compute().execute(JobStealingTask.class, null); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + JobStealingCollisionSpi colSpi = new JobStealingCollisionSpi(); + + // One job at a time. + colSpi.setActiveJobsThreshold(gridName.endsWith("1") ? 0 : 2); + colSpi.setWaitJobsThreshold(0); + + JobStealingFailoverSpi failSpi = new JobStealingFailoverSpi(); + + // Verify defaults. + assert failSpi.getMaximumFailoverAttempts() == JobStealingFailoverSpi.DFLT_MAX_FAILOVER_ATTEMPTS; + + cfg.setCollisionSpi(colSpi); + cfg.setFailoverSpi(failSpi); + + return cfg; + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + public static class JobStealingTask extends ComputeTaskAdapter<Object, Object> { + /** Grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Object arg) throws IgniteCheckedException { + assert subgrid.size() == 2 : "Invalid subgrid size: " + subgrid.size(); + + Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size()); + + // Put all jobs onto local node. + for (Iterator iter = subgrid.iterator(); iter.hasNext(); iter.next()) + map.put(new GridJobStealingJob(5000L), ignite.cluster().localNode()); + + return map; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == 2; + + for (ComputeJobResult res : results) { + log.info("Job result: " + res.getData()); + } + + String name1 = results.get(0).getData(); + String name2 = results.get(1).getData(); + + assert name1.equals(name2); + + assert !name1.equals(ignite1.name()); + assert name1.equals(ignite2.name()); + + return null; + } + } + + /** + * + */ + @SuppressWarnings({"PublicInnerClass"}) + public static final class GridJobStealingJob extends ComputeJobAdapter { + /** Injected grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * @param arg Job argument. + */ + GridJobStealingJob(Long arg) { + super(arg); + } + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + try { + Long sleep = argument(0); + + assert sleep != null; + + Thread.sleep(sleep); + } + catch (InterruptedException e) { + throw new IgniteCheckedException("Job got interrupted.", e); + } + + return ignite.name(); + } + } +}