http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridStartStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridStartStopSelfTest.java new file mode 100644 index 0000000..773cf28 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridStartStopSelfTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Checks basic node start/stop operations. + */ +@SuppressWarnings({"CatchGenericClass", "InstanceofCatchParameter"}) +@GridCommonTest(group = "Kernal Self") +public class GridStartStopSelfTest extends GridCommonAbstractTest { + /** */ + public static final int COUNT = 1; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + System.setProperty(GG_OVERRIDE_MCAST_GRP, GridTestUtils.getNextMulticastGroup(GridStartStopSelfTest.class)); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop() throws Exception { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setRestEnabled(false); + + info("Grid start-stop test count: " + COUNT); + + for (int i = 0; i < COUNT; i++) { + info("Starting grid."); + + try (Ignite g = G.start(cfg)) { + assert g != null; + + info("Stopping grid " + g.cluster().localNode().id()); + } + } + } + + /** + * TODO: GG-7704 + * @throws Exception If failed. + */ + public void _testStopWhileInUse() throws Exception { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setRestEnabled(false); + + cfg.setGridName(getTestGridName(0)); + + CacheConfiguration cc = new CacheConfiguration(); + + cc.setAtomicityMode(TRANSACTIONAL); + + cfg.setCacheConfiguration(cc); + + final Ignite g0 = G.start(cfg); + + cfg = new IgniteConfiguration(); + + cfg.setGridName(getTestGridName(1)); + + cc = new CacheConfiguration(); + + cc.setAtomicityMode(TRANSACTIONAL); + + cfg.setCacheConfiguration(cc); + + final CountDownLatch latch = new CountDownLatch(1); + + Ignite g1 = G.start(cfg); + + Thread stopper = new Thread(new Runnable() { + @Override public void run() { + try { + try (IgniteTx ignored = g0.cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { + g0.cache(null).get(1); + + latch.countDown(); + + Thread.sleep(500); + + info("Before stop."); + + G.stop(getTestGridName(1), true); + } + } + catch (Exception e) { + error("Error.", e); + } + } + }); + + stopper.start(); + + assert latch.await(1, SECONDS); + + info("Before remove."); + + g1.cache(null).remove(1); + } + + /** + * @throws Exception If failed. + */ + public void testStoppedState() throws Exception { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setRestEnabled(false); + + Ignite ignite = G.start(cfg); + + assert ignite != null; + + G.stop(ignite.name(), true); + + try { + ignite.cluster().localNode(); + } + catch (Exception e) { + assert e instanceof IllegalStateException : "Wrong exception type."; + } + + try { + ignite.cluster().nodes(); + + assert false; + } + catch (Exception e) { + assert e instanceof IllegalStateException : "Wrong exception type."; + } + + try { + ignite.cluster().forRemotes(); + + assert false; + } + catch (Exception e) { + assert e instanceof IllegalStateException : "Wrong exception type."; + } + + try { + ignite.compute().localTasks(); + + assert false; + } + catch (Exception e) { + assert e instanceof IllegalStateException : "Wrong exception type."; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridStartupMain.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridStartupMain.java b/modules/core/src/test/java/org/apache/ignite/internal/GridStartupMain.java new file mode 100644 index 0000000..ae3728a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridStartupMain.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.internal.util.typedef.*; + +import javax.swing.*; + +/** + * GridGain startup. + */ +public class GridStartupMain { + /** + * @param args Arguments. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + //resetLog4j("org.apache.ignite.internal.processors.cache.distributed.dht.preloader", Level.DEBUG, false, 0); + + //G.start("modules/tests/config/spring-multicache.xml"); + //G.start("examples/config/example-cache.xml"); + + G.start(); + + // Wait until Ok is pressed. + JOptionPane.showMessageDialog( + null, + new JComponent[] { + new JLabel("GridGain started."), + new JLabel( + "<html>" + + "You can use JMX console at <u>http://localhost:1234</u>" + + "</html>"), + new JLabel("Press OK to stop GridGain.") + }, + "GridGain Startup JUnit", + JOptionPane.INFORMATION_MESSAGE + ); + + G.stop(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridStartupTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridStartupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridStartupTest.java new file mode 100644 index 0000000..160f0f8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridStartupTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.swing.*; + +/** + * GridGain startup. + */ +@SuppressWarnings({"ProhibitedExceptionDeclared"}) +@GridCommonTest(group = "Kernal") +public class GridStartupTest extends GridCommonAbstractTest { + /** */ + public GridStartupTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return Long.MAX_VALUE; + } + + /** + * @throws Exception If failed. + */ + public void testStartup() throws Exception { + //resetLog4j("org.apache.ignite.internal.processors.cache.distributed.dht.preloader", Level.DEBUG, false, 0); + + //G.start("modules/tests/config/spring-multicache.xml"); + //G.start("examples/config/example-cache.xml"); + + G.start(); + + // Wait until Ok is pressed. + JOptionPane.showMessageDialog( + null, + new JComponent[] { + new JLabel("GridGain started."), + new JLabel( + "<html>" + + "You can use JMX console at <u>http://localhost:1234</u>" + + "</html>"), + new JLabel("Press OK to stop GridGain.") + }, + "GridGain Startup JUnit", + JOptionPane.INFORMATION_MESSAGE + ); + + G.stop(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java new file mode 100644 index 0000000..89c4c6f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Tests grid stop with jobs canceling. + */ +@GridCommonTest(group = "Kernal Self") +public class GridStopWithCancelSelfTest extends GridCommonAbstractTest { + /** */ + private static CountDownLatch cnt; + + /** */ + private static volatile boolean cancelCorrect; + + /** + * Constructor. + */ + public GridStopWithCancelSelfTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10000; + } + + /** + * @throws Exception If an error occurs. + */ + public void testStopGrid() throws Exception { + cancelCorrect = false; + + cnt = new CountDownLatch(1); + + try { + Ignite ignite = startGrid("testGrid"); + + executeAsync(ignite.compute(), CancelledTask.class, null); + + cnt.await(); + } + finally { + stopGrid("testGrid", true); + } + + assert cancelCorrect; + } + + /** + * Test task that will be canceled. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static final class CancelledTask extends ComputeTaskAdapter<String, Object> { + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable String arg) throws IgniteCheckedException { + for (ClusterNode node : subgrid) { + if (node.id().equals(ignite.configuration().getNodeId())) { + return Collections.singletonMap(new ComputeJobAdapter() { + @IgniteInstanceResource + private Ignite ignite; + + @Override public void cancel() { + cancelCorrect = true; + } + + @Override public Serializable execute() throws IgniteCheckedException { + cnt.countDown(); + + try { + Thread.sleep(Long.MAX_VALUE); + } + catch (InterruptedException e) { + throw new IgniteCheckedException(e); + } + + return null; + } + }, node); + } + } + + throw new IgniteCheckedException("Local node not found"); + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithWaitSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithWaitSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithWaitSelfTest.java new file mode 100644 index 0000000..58ff860 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithWaitSelfTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.failover.always.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Tests waiting for unfinished tasks while stopping the grid. + */ +@GridCommonTest(group = "Kernal Self") +public class GridStopWithWaitSelfTest extends GridCommonAbstractTest { + /** Initial node that job has been mapped to. */ + private static final AtomicReference<ClusterNode> nodeRef = new AtomicReference<>(null); + + /** */ + private static CountDownLatch jobStarted; + + /** + * + */ + public GridStopWithWaitSelfTest() { + super(/*start Grid*/false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setFailoverSpi(new AlwaysFailoverSpi()); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testWait() throws Exception { + jobStarted = new CountDownLatch(1); + + ComputeTaskFuture<Object> fut = null; + + try { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + assert ignite1 != null; + assert ignite2 != null; + + fut = executeAsync(ignite1.compute().withTimeout(10000), + GridWaitTask.class.getName(), + ignite1.cluster().localNode().id()); + + jobStarted.await(); + } + finally { + // Do not cancel but wait. + G.stop(getTestGridName(1), false); + G.stop(getTestGridName(2), false); + } + + assert fut != null; + + Integer res = (Integer)fut.get(); + + assert res == 1; + } + + /** + * @throws Exception If failed. + */ + public void testWaitFailover() throws Exception { + jobStarted = new CountDownLatch(1); + + ComputeTaskFuture<Object> fut = null; + + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + try { + assert ignite1 != null; + assert ignite2 != null; + + long timeout = 3000; + + fut = executeAsync(ignite1.compute().withTimeout(timeout), JobFailTask.class.getName(), "1"); + + jobStarted.await(timeout, TimeUnit.MILLISECONDS); + } + finally { + // Do not cancel but wait. + G.stop(getTestGridName(1), false); + G.stop(getTestGridName(2), false); + } + + assert fut != null; + + Integer res = (Integer)fut.get(); + + assert res == 1; + } + + /** + * + */ + @ComputeTaskSessionFullSupport + private static class GridWaitTask extends ComputeTaskAdapter<UUID, Integer> { + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, UUID arg) throws IgniteCheckedException { + ClusterNode mappedNode = null; + + for (ClusterNode node : subgrid) { + if (node.id().equals(arg)) { + mappedNode = node; + + break; + } + } + + assert mappedNode != null; + + return Collections.singletonMap(new ComputeJobAdapter(arg) { + @Override public Integer execute() { + jobStarted.countDown(); + + return 1; + } + }, mappedNode); + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return results.get(0).getData(); + } + } + + /** + * + */ + @ComputeTaskSessionFullSupport + private static class JobFailTask implements ComputeTask<String, Object> { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) throws IgniteCheckedException { + ses.setAttribute("fail", true); + + ClusterNode node = F.view(subgrid, F.remoteNodes(ignite.configuration().getNodeId())).iterator().next(); + + nodeRef.set(node); + + return Collections.singletonMap(new ComputeJobAdapter(arg) { + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + @Override public Serializable execute() throws IgniteCheckedException { + jobStarted.countDown(); + + log.info("Starting to execute job with fail attribute: " + ses.getAttribute("fail")); + + boolean fail; + + assert ignite != null; + + UUID locId = ignite.configuration().getNodeId(); + + assert locId != null; + + try { + fail = ses.waitForAttribute("fail", 0); + } + catch (InterruptedException e) { + throw new IgniteCheckedException("Got interrupted while waiting for attribute to be set.", e); + } + + log.info("Failed attribute: " + fail); + + // Should fail on local node and sent to remote one. + if (fail) { + ses.setAttribute("fail", false); + + assert nodeRef.get().id().equals(locId); + + log.info("Throwing grid exception from job."); + + throw new IgniteCheckedException("Job exception."); + } + + assert !nodeRef.get().id().equals(locId); + + Integer res = Integer.parseInt(this.<String>argument(0)); + + log.info("Returning job result: " + res); + + // This job does not return any result. + return res; + } + }, node); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { + if (res.getException() != null && !(res.getException() instanceof ComputeUserUndeclaredException)) { + assert res.getNode().id().equals(nodeRef.get().id()); + + return ComputeJobResultPolicy.FAILOVER; + } + + assert !res.getNode().id().equals(nodeRef.get().id()); + + return ComputeJobResultPolicy.REDUCE; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> res) throws IgniteCheckedException { + assert res.size() == 1; + + assert nodeRef.get() != null; + + assert !res.get(0).getNode().id().equals(nodeRef.get().id()) : + "Initial node and result one are the same (should be different)."; + + return res.get(0).getData(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskCancelSingleNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskCancelSingleNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskCancelSingleNodeSelfTest.java new file mode 100644 index 0000000..c587e5d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskCancelSingleNodeSelfTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Test for task cancellation issue. + * <p/> + * http://www.gridgainsystems.com/jiveforums/thread.jspa?messageID=8034 + */ +public class GridTaskCancelSingleNodeSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopGrid(); + } + + /** + * @throws Exception If failed. + */ + public void testImmediateCancellation() throws Exception { + checkCancellation(0L); + } + + /** + * @throws Exception If failed. + */ + public void testCancellation() throws Exception { + checkCancellation(2000L); + } + + /** + * @param timeoutBeforeCancel Timeout. + * @throws Exception If failed. + */ + @SuppressWarnings("ErrorNotRethrown") + private void checkCancellation(long timeoutBeforeCancel) throws Exception { + final AtomicInteger finished = new AtomicInteger(); + final AtomicInteger cancelled = new AtomicInteger(); + final AtomicInteger rejected = new AtomicInteger(); + + grid().events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + info("Received event: " + evt); + + switch (evt.type()) { + case EVT_JOB_FINISHED: + finished.incrementAndGet(); + + break; + + case EVT_JOB_CANCELLED: + cancelled.incrementAndGet(); + + break; + + case EVT_JOB_REJECTED: + rejected.incrementAndGet(); + + break; + + default: + assert false : "Unexpected event: " + evt; + } + + return true; + } + }, EVT_JOB_FINISHED, EVT_JOB_CANCELLED, EVT_JOB_REJECTED); + + IgniteCompute comp = grid().compute().enableAsync(); + + comp.execute(TestTask.class, null); + + ComputeTaskFuture<?> fut = comp.future(); + + if (timeoutBeforeCancel > 0L) + Thread.sleep(timeoutBeforeCancel); + + assert fut.cancel(); + + for (int i = 0; i < 3; i++) { + try { + if (timeoutBeforeCancel == 0L) + assert (finished.get() == 0 && cancelled.get() == 0 && rejected.get() == 0) : + "Failed on iteration [i=" + i + ", finished=" + finished.get() + + ", cancelled=" + cancelled.get() + ", rejected=" + rejected.get() + ']'; + else + assert (finished.get() == 1 && cancelled.get() == 1 && rejected.get() == 0) : + "Failed on iteration [i=" + i + ", finished=" + finished.get() + + ", cancelled=" + cancelled.get() + ", rejected=" + rejected.get() + ']'; + } + catch (AssertionError e) { + info("Check failed: " + e.getMessage()); + + if (timeoutBeforeCancel == 0L && i == 2) + throw e; + } + + if (i < 2) + U.sleep(500); + } + + try { + fut.get(); + + assert false; + } + catch (IgniteFutureCancelledException e) { + info("Caught expected exception: " + e); + } + } + + /** + * + */ + @ComputeTaskMapAsync + private static class TestTask extends ComputeTaskSplitAdapter<Void, Void> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Void arg) { + return F.asSet(new ComputeJobAdapter() { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteInstanceResource + private Ignite g; + + /** {@inheritDoc} */ + @Override public Object execute() { + log.info("Executing job on node: " + g.cluster().localNode().id()); + + try { + Thread.sleep(5000); + } + catch (InterruptedException ignored) { + log.info("Job thread has been interrupted."); + + Thread.currentThread().interrupt(); + } + + return null; + } + }); + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskContinuousMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskContinuousMapperSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskContinuousMapperSelfTest.java new file mode 100644 index 0000000..481018f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskContinuousMapperSelfTest.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +/** + * {@link org.apache.ignite.compute.ComputeTaskContinuousMapper} test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridTaskContinuousMapperSelfTest extends GridCommonAbstractTest { + /** + * @throws Exception If test failed. + */ + public void testContinuousMapperMethods() throws Exception { + try { + Ignite ignite = startGrid(0); + startGrid(1); + + ignite.compute().execute(TestAllMethodsTask.class, null); + } + finally { + stopGrid(0); + stopGrid(1); + } + } + + /** + * @throws Exception If test failed. + */ + public void testContinuousMapperLifeCycle() throws Exception { + try { + Ignite ignite = startGrid(0); + + ignite.compute().execute(TestLifeCycleTask.class, null); + } + finally { + stopGrid(0); + } + } + + /** + * @throws Exception If test failed. + */ + public void testContinuousMapperNegative() throws Exception { + try { + Ignite ignite = startGrid(0); + + ignite.compute().execute(TestNegativeTask.class, null); + } + finally { + stopGrid(0); + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + public static class TestAllMethodsTask extends ComputeTaskAdapter<Object, Object> { + /** */ + @SuppressWarnings({"UnusedDeclaration"}) + @IgniteTaskContinuousMapperResource + private ComputeTaskContinuousMapper mapper; + + /** */ + private int cnt; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) throws IgniteCheckedException { + assert subgrid.size() == 2; + + mapper.send(new TestJob(cnt++), subgrid.get(0)); + + Map<ComputeJob, ClusterNode> mappedJobs = new HashMap<>(2); + + mappedJobs.put(new TestJob(cnt++), subgrid.get(0)); + mappedJobs.put(new TestJob(cnt++), subgrid.get(1)); + + mapper.send(mappedJobs); + + mapper.send(new TestJob(cnt++)); + + int size = subgrid.size(); + + Collection<ComputeJob> jobs = new ArrayList<>(size); + + for (ClusterNode n : subgrid) + jobs.add(new TestJob(cnt++)); + + mapper.send(jobs); + + return null; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == cnt : "Unexpected result count: " + results.size(); + + return null; + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + public static class TestLifeCycleTask extends ComputeTaskAdapter<Object, Object> { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private ComputeTaskContinuousMapper mapper; + + /** + * @param mapper Continuous mapper. + * @throws IgniteCheckedException Thrown if any exception occurs. + */ + @SuppressWarnings("unused") + @IgniteTaskContinuousMapperResource + private void setMapper(ComputeTaskContinuousMapper mapper) throws IgniteCheckedException { + this.mapper = mapper; + + mapper.send(new TestJob()); + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) throws IgniteCheckedException { + mapper.send(new TestJob()); + + return null; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received) throws IgniteCheckedException { + ComputeJobResultPolicy plc = super.result(res, received); + + if (received != null && received.size() == 2) + mapper.send(new TestJob()); + + return plc; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == 3 : "Unexpected result count: " + results.size(); + + ClusterNode node = results.get(0).getNode(); + + try { + mapper.send(new TestJob(), node); + + assert false; + } + catch (IgniteCheckedException e) { + if (log.isInfoEnabled()) + log.info("Expected exception: " + e); + } + + try { + mapper.send(Collections.singletonMap(new TestJob(), node)); + + assert false; + } + catch (IgniteCheckedException e) { + if (log.isInfoEnabled()) + log.info("Expected exception: " + e); + } + + try { + mapper.send(new TestJob()); + + assert false; + } + catch (IgniteCheckedException e) { + if (log.isInfoEnabled()) + log.info("Expected exception: " + e); + } + + try { + mapper.send(Collections.singleton(new TestJob())); + + assert false; + } + catch (IgniteCheckedException e) { + if (log.isInfoEnabled()) + log.info("Expected exception: " + e); + } + + return null; + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + public static class TestNegativeTask extends ComputeTaskAdapter<Object, Object> { + /** */ + @SuppressWarnings({"UnusedDeclaration"}) + @IgniteTaskContinuousMapperResource + private ComputeTaskContinuousMapper mapper; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) throws IgniteCheckedException { + try { + mapper.send(new TestJob(), null); + + assert false; + + } + catch (NullPointerException e) { + if (log.isInfoEnabled()) + log.info("Expected exception: " + e); + } + + try { + mapper.send(null, subgrid.get(0)); + + assert false; + } + catch (NullPointerException e) { + if (log.isInfoEnabled()) + log.info("Expected exception: " + e); + } + + try { + mapper.send((Map<? extends ComputeJob, ClusterNode>)null); + + assert false; + } + catch (NullPointerException e) { + if (log.isInfoEnabled()) + log.info("Expected exception: " + e); + } + + try { + mapper.send(Collections.singletonMap(new TestJob(), (ClusterNode)null)); + + assert false; + } + catch (IgniteCheckedException e) { + if (log.isInfoEnabled()) + log.info("Expected exception: " + e); + } + + try { + mapper.send((ComputeJob)null); + + assert false; + } + catch (NullPointerException e) { + if (log.isInfoEnabled()) + log.info("Expected exception: " + e); + } + + try { + mapper.send((Collection<ComputeJob>)null); + + assert false; + } + catch (NullPointerException e) { + if (log.isInfoEnabled()) + log.info("Expected exception: " + e); + } + + try { + mapper.send(Collections.singleton((ComputeJob)null)); + + assert false; + } + catch (IgniteCheckedException e) { + if (log.isInfoEnabled()) + log.info("Expected exception: " + e); + } + + mapper.send(new TestJob()); + + return null; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == 1; + + return null; + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + public static class TestJob extends ComputeJobAdapter { + /** */ + public TestJob() { + super(-1); + } + + /** + * @param idx Index. + */ + public TestJob(int idx) { + super(idx); + } + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + return argument(0); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionContextSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionContextSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionContextSelfTest.java new file mode 100644 index 0000000..9ae2a78 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionContextSelfTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Tests for {@code GridProjection.withXXX(..)} methods. + */ +public class GridTaskExecutionContextSelfTest extends GridCommonAbstractTest { + /** */ + private static final AtomicInteger CNT = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + CNT.set(0); + } + + /** + * @throws Exception If failed. + */ + public void testWithName() throws Exception { + Callable<String> f = new IgniteCallable<String>() { + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + @Override public String call() { + return ses.getTaskName(); + } + }; + + Ignite g = grid(0); + + assert "name1".equals(g.compute().withName("name1").call(f)); + assert "name2".equals(g.compute().withName("name2").call(f)); + assert f.getClass().getName().equals(g.compute().call(f)); + + assert "name1".equals(g.compute().withName("name1").execute(new TestTask(false), null)); + assert "name2".equals(g.compute().withName("name2").execute(new TestTask(false), null)); + assert TestTask.class.getName().equals(g.compute().execute(new TestTask(false), null)); + } + + /** + * @throws Exception If failed. + */ + public void testWithNoFailoverClosure() throws Exception { + final Runnable r = new GridAbsClosureX() { + @Override public void applyx() throws IgniteCheckedException { + CNT.incrementAndGet(); + + throw new ComputeExecutionRejectedException("Expected error."); + } + }; + + final Ignite g = grid(0); + + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Override public Object call() throws Exception { + g.compute().withNoFailover().run(r); + + return null; + } + }, + ComputeExecutionRejectedException.class, + "Expected error." + ); + + assertEquals(1, CNT.get()); + } + + /** + * @throws Exception If failed. + */ + public void testWithNoFailoverTask() throws Exception { + final Ignite g = grid(0); + + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Override public Object call() throws Exception { + g.compute().withNoFailover().execute(new TestTask(true), null); + + return null; + } + }, + ComputeExecutionRejectedException.class, + "Expected error." + ); + + assertEquals(1, CNT.get()); + } + + /** + * Test task that returns its name. + */ + private static class TestTask extends ComputeTaskSplitAdapter<Void, String> { + /** */ + private final boolean fail; + + /** + * @param fail Whether to fail. + */ + private TestTask(boolean fail) { + this.fail = fail; + } + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Void arg) throws IgniteCheckedException { + return F.asSet(new ComputeJobAdapter() { + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + @Override public Object execute() throws IgniteCheckedException { + CNT.incrementAndGet(); + + if (fail) + throw new ComputeExecutionRejectedException("Expected error."); + + return ses.getTaskName(); + } + }); + } + + /** {@inheritDoc} */ + @Override public String reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return F.first(results).getData(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java new file mode 100644 index 0000000..a843394 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.testframework.junits.common.*; + +/** + * Task execution test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridTaskExecutionSelfTest extends GridCommonAbstractTest { + /** Grid instance. */ + private Ignite ignite; + + /** */ + public GridTaskExecutionSelfTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite = startGrid(1); + + startGrid(2); + startGrid(3); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(1); + stopGrid(2); + stopGrid(3); + + ignite = null; + } + + /** + * @throws Exception If failed. + */ + public void testSynchronousExecute() throws Exception { + IgniteCompute comp = ignite.compute().enableAsync(); + + assertNull(comp.execute(GridTestTask.class, "testArg")); + + ComputeTaskFuture<?> fut = comp.future(); + + assert fut != null; + + info("Task result: " + fut.get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverSelfTest.java new file mode 100644 index 0000000..2ae3970 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverSelfTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +/** + * Test for task failover. + */ +@GridCommonTest(group = "Kernal Self") +public class GridTaskFailoverSelfTest extends GridCommonAbstractTest { + /** Don't change it value. */ + public static final int SPLIT_COUNT = 2; + + /** */ + public GridTaskFailoverSelfTest() { + super(false); + } + + /** + * @throws Exception If test failed. + */ + @SuppressWarnings("unchecked") + public void testFailover() throws Exception { + Ignite ignite = startGrid(); + + try { + ignite.compute().localDeployTask(GridFailoverTestTask.class, GridFailoverTestTask.class.getClassLoader()); + + ComputeTaskFuture<?> fut = ignite.compute().execute(GridFailoverTestTask.class.getName(), null); + + assert fut != null; + + fut.get(); + + assert false : "Should never be reached due to exception thrown."; + } + catch (ClusterTopologyException e) { + info("Received correct exception: " + e); + } + finally { + stopGrid(); + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + public static class GridFailoverTestTask extends ComputeTaskSplitAdapter<Serializable, Integer> { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public Collection<ComputeJobAdapter> split(int gridSize, Serializable arg) { + if (log.isInfoEnabled()) + log.info("Splitting job [job=" + this + ", gridSize=" + gridSize + ", arg=" + arg + ']'); + + Collection<ComputeJobAdapter> jobs = new ArrayList<>(SPLIT_COUNT); + + for (int i = 0; i < SPLIT_COUNT; i++) + jobs.add(new ComputeJobAdapter() { + @Override public Serializable execute() { + if (log.isInfoEnabled()) + log.info("Computing job [job=" + this + ']'); + + return null; + } + }); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received) throws + IgniteCheckedException { + if (res.getException() != null) + throw res.getException(); + + return ComputeJobResultPolicy.FAILOVER; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) { + if (log.isInfoEnabled()) + log.info("Reducing job [job=" + this + ", results=" + results + ']'); + + int res = 0; + + for (ComputeJobResult result : results) + res += (Integer)result.getData(); + + return res; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java new file mode 100644 index 0000000..bf6eafb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Test for task future when grid stops. + */ +@GridCommonTest(group = "Kernal Self") +@SuppressWarnings({"UnusedDeclaration"}) +public class GridTaskFutureImplStopGridSelfTest extends GridCommonAbstractTest { + /** */ + private static final int WAIT_TIME = 5000; + + /** */ + public static final int SPLIT_COUNT = 5; + + /** */ + private static CountDownLatch startSignal = new CountDownLatch(SPLIT_COUNT); + + /** */ + private static final Object mux = new Object(); + + /** */ + @SuppressWarnings({"StaticNonFinalField"}) + private static int cnt; + + /** */ + public GridTaskFutureImplStopGridSelfTest() { + super(false); + } + + /** + * @throws Exception If test failed. + */ + public void testGet() throws Exception { + Ignite ignite = startGrid(getTestGridName()); + + Thread futThread = null; + + try { + final ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridStopTestTask.class.getName(), null); + + fut.listenAsync(new CI1<IgniteFuture>() { + @SuppressWarnings({"NakedNotify"}) + @Override public void apply(IgniteFuture gridFut) { + synchronized (mux) { + mux.notifyAll(); + } + } + }); + + final CountDownLatch latch = new CountDownLatch(1); + + final AtomicBoolean failed = new AtomicBoolean(false); + + futThread = new Thread(new Runnable() { + /** {@inheritDoc} */ + @Override public void run() { + try { + startSignal.await(); + + Object res = fut.get(); + + info("Task result: " + res); + } + catch (Throwable e) { + failed.set(true); + + // Make sure that message contains info about stopping grid. + assert e.getMessage().startsWith("Task failed due to stopping of the grid:"); + } + finally { + latch.countDown(); + } + } + + }, "test-task-future-thread"); + + futThread.start(); + + long delta = WAIT_TIME; + long end = System.currentTimeMillis() + delta; + + synchronized (mux) { + while (cnt < SPLIT_COUNT && delta > 0) { + mux.wait(delta); + + delta = end - System.currentTimeMillis(); + } + } + + // Stops grid. + stopGrid(getTestGridName()); + + boolean finished = latch.await(WAIT_TIME, TimeUnit.MILLISECONDS); + + info("Future thread [alive=" + futThread.isAlive() + ']'); + + info("Test task result [failed=" + failed.get() + ", taskFuture=" + fut + ']'); + + assert finished : "Future thread was not stopped."; + + assert fut.isDone(); + } + finally { + if (futThread != null && futThread.isAlive()) { + info("Task future thread interruption."); + + futThread.interrupt(); + } + + if (G.state(getTestGridName()) != IgniteState.STOPPED) + stopGrid(getTestGridName()); + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass", "UnusedDeclaration"}) + public static class GridStopTestTask extends ComputeTaskSplitAdapter<Object, Object> { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { + if (log.isInfoEnabled()) + log.info("Splitting job [job=" + this + ", gridSize=" + gridSize + ", arg=" + arg + ']'); + + Collection<ComputeJob> jobs = new ArrayList<>(SPLIT_COUNT); + + for (int i = 0; i < SPLIT_COUNT; i++) + jobs.add(new GridStopTestJob()); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public Serializable reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + if (log.isInfoEnabled()) + log.info("Aggregating job [job=" + this + ", results=" + results + ']'); + + int res = 0; + + for (ComputeJobResult result : results) { + res += (Integer)result.getData(); + } + + return res; + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + public static class GridStopTestJob extends ComputeJobAdapter { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public Serializable execute() { + if (log.isInfoEnabled()) + log.info("Executing job [job=" + this + ']'); + + startSignal.countDown(); + + synchronized (mux) { + cnt++; + + mux.notifyAll(); + } + + try { + Thread.sleep(Integer.MAX_VALUE); + } + catch (InterruptedException ignore) { + if (log.isInfoEnabled()) + log.info("Job got interrupted: " + this); + } + + if (!Thread.currentThread().isInterrupted()) + log.error("Job not interrupted: " + this); + + return !Thread.currentThread().isInterrupted() ? 0 : 1; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskInstanceExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskInstanceExecutionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskInstanceExecutionSelfTest.java new file mode 100644 index 0000000..80fe81f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskInstanceExecutionSelfTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * Task instance execution test. + */ +@SuppressWarnings("PublicInnerClass") +@GridCommonTest(group = "Kernal Self") +public class GridTaskInstanceExecutionSelfTest extends GridCommonAbstractTest { + /** */ + private static Object testState; + + /** */ + public GridTaskInstanceExecutionSelfTest() { + super(true); + } + + /** + * @throws Exception If failed. + */ + public void testSynchronousExecute() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + testState = 12345; + + GridStatefulTask task = new GridStatefulTask(testState); + + assert task.getState() != null; + assert task.getState() == testState; + + IgniteCompute comp = ignite.compute().enableAsync(); + + assertNull(comp.execute(task, "testArg")); + + ComputeTaskFuture<?> fut = comp.future(); + + assert fut != null; + + info("Task result: " + fut.get()); + } + + /** + * Stateful task. + */ + public static class GridStatefulTask extends GridTestTask { + /** */ + private Object state; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** + * @param state State. + */ + public GridStatefulTask(Object state) { + this.state = state; + } + + /** + * @return The state. + */ + public Object getState() { + return state; + } + + /** {@inheritDoc} */ + @Override public Collection<? extends ComputeJob> split(int gridSize, Object arg) { + log.info("Task split state: " + state); + + assert state != null; + assert state == testState; + + return super.split(gridSize, arg); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received) throws IgniteCheckedException { + log.info("Task result state: " + state); + + assert state != null; + assert state == testState; + + return super.result(res, received); + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + log.info("Task reduce state: " + state); + + assert state != null; + assert state == testState; + + return super.reduce(results); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskInstantiationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskInstantiationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskInstantiationSelfTest.java new file mode 100644 index 0000000..d920146 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskInstantiationSelfTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Tests instantiation of various task types (defined as private inner class, without default constructor, non-public + * default constructor). + */ +@GridCommonTest(group = "Kernal Self") +public class GridTaskInstantiationSelfTest extends GridCommonAbstractTest { + /** + * Constructor. + */ + public GridTaskInstantiationSelfTest() { + super(true); + } + + /** + * @throws Exception If an error occurs. + */ + public void testTasksInstantiation() throws Exception { + grid().compute().execute(PrivateClassTask.class, null); + + grid().compute().execute(NonPublicDefaultConstructorTask.class, null); + + try { + grid().compute().execute(NoDefaultConstructorTask.class, null); + + assert false : "Exception should have been thrown."; + } + catch (Exception e) { + info("Caught expected exception: " + e); + } + } + + /** + * Test task defined as private inner class. + */ + private static class PrivateClassTask extends ComputeTaskAdapter<String, Object> { + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable String arg) throws IgniteCheckedException { + for (ClusterNode node : subgrid) + if (node.id().equals(ignite.configuration().getNodeId())) + return Collections.singletonMap(new ComputeJobAdapter() { + @Override public Serializable execute() { + return null; + } + }, node); + + throw new IgniteCheckedException("Local node not found."); + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) { + return null; + } + } + + /** + * Test task defined with non-public default constructor. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static final class NonPublicDefaultConstructorTask extends PrivateClassTask { + /** + * No-op constructor. + */ + private NonPublicDefaultConstructorTask() { + // No-op. + } + } + + /** + * Test task defined without default constructor. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static final class NoDefaultConstructorTask extends PrivateClassTask { + /** + * No-op constructor. + * + * @param param Some parameter. + */ + @SuppressWarnings({"unused"}) + private NoDefaultConstructorTask(Object param) { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskJobRejectSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskJobRejectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskJobRejectSelfTest.java new file mode 100644 index 0000000..9b055a8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskJobRejectSelfTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.collision.fifoqueue.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Test that rejected job is not failed over. + */ +public class GridTaskJobRejectSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(1); + startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(1); + stopGrid(2); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + FifoQueueCollisionSpi collision = new FifoQueueCollisionSpi(); + + collision.setParallelJobsNumber(1); + + cfg.setCollisionSpi(collision); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testReject() throws Exception { + grid(1).events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + X.println("Task event: " + evt); + + return true; + } + }, EVTS_TASK_EXECUTION); + + grid(1).events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + X.println("Job event: " + evt); + + return true; + } + }, EVTS_JOB_EXECUTION); + + final CountDownLatch startedLatch = new CountDownLatch(1); + + grid(1).events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + startedLatch.countDown(); + + return true; + } + }, EVT_JOB_STARTED); + + final AtomicInteger failedOver = new AtomicInteger(0); + + grid(1).events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + failedOver.incrementAndGet(); + + return true; + } + }, EVT_JOB_FAILED_OVER); + + final CountDownLatch finishedLatch = new CountDownLatch(1); + + grid(1).events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + finishedLatch.countDown(); + + return true; + } + }, EVT_TASK_FINISHED, EVT_TASK_FAILED); + + final ClusterNode node = grid(1).localNode(); + + IgniteCompute comp = grid(1).compute().enableAsync(); + + comp.execute(new ComputeTaskAdapter<Void, Void>() { + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Void arg) { + return F.asMap(new SleepJob(), node, new SleepJob(), node); + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + }, null); + + ComputeTaskFuture<?> fut = comp.future(); + + assert startedLatch.await(2, SECONDS); + + fut.cancel(); + + assert finishedLatch.await(2, SECONDS); + + assert failedOver.get() == 0; + } + + /** + * Sleeping job. + */ + private static final class SleepJob extends ComputeJobAdapter { + /** {@inheritDoc} */ + @Override public Object execute() { + try { + Thread.sleep(10000); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskListenerSelfTest.java new file mode 100644 index 0000000..abdbcc0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskListenerSelfTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * This test checks that GridTaskListener is only called once per task. + */ +@SuppressWarnings("deprecation") +@GridCommonTest(group = "Kernal Self") +public class GridTaskListenerSelfTest extends GridCommonAbstractTest { + /** */ + public GridTaskListenerSelfTest() { + super(/*start grid*/true); + } + + /** + * Checks that GridTaskListener is only called once per task. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"BusyWait", "unchecked"}) + public void testGridTaskListener() throws Exception { + final AtomicInteger cnt = new AtomicInteger(0); + + IgniteInClosure<IgniteFuture<?>> lsnr = new CI1<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> fut) { + assert fut != null; + + cnt.incrementAndGet(); + } + }; + + Ignite ignite = G.ignite(getTestGridName()); + + assert ignite != null; + + ignite.compute().localDeployTask(TestTask.class, TestTask.class.getClassLoader()); + + ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), TestTask.class.getName(), null); + + fut.listenAsync(lsnr); + + fut.get(); + + while (cnt.get() == 0) { + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + error("Got interrupted while sleep.", e); + + break; + } + } + + assert cnt.get() == 1 : "Unexpected GridTaskListener apply count [count=" + cnt.get() + ", expected=1]"; + } + + /** Test task. */ + private static class TestTask extends ComputeTaskSplitAdapter<Serializable, Object> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Serializable arg) throws IgniteCheckedException { + Collection<ComputeJobAdapter> jobs = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + jobs.add(new ComputeJobAdapter() { + @Override public Serializable execute() { + return 1; + } + }); + } + + return jobs; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskMapAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskMapAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskMapAsyncSelfTest.java new file mode 100644 index 0000000..88530c0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskMapAsyncSelfTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * + */ +@GridCommonTest(group = "Kernal Self") +public class GridTaskMapAsyncSelfTest extends GridCommonAbstractTest { + /** + * + */ + public GridTaskMapAsyncSelfTest() { + super(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + c.setDiscoverySpi(discoSpi); + + return c; + } + + /** + * @throws Exception If failed. + */ + public void testTaskMap() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + info("Executing sync mapped task."); + + ignite.compute().execute(SyncMappedTask.class, null); + + info("Executing async mapped task."); + + ignite.compute().execute(AsyncMappedTask.class, null); + } + + /** + * + */ + @ComputeTaskMapAsync + private static class AsyncMappedTask extends BaseTask { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { + Collection<? extends ComputeJob> res = super.split(gridSize, arg); + + assert mainThread != mapper; + + return res; + } + } + + /** + * + */ + private static class SyncMappedTask extends BaseTask { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { + Collection<? extends ComputeJob> res = super.split(gridSize, arg); + + assert mainThread == mapper; + + return res; + } + } + + /** + * Test task. + */ + private abstract static class BaseTask extends ComputeTaskSplitAdapter<Object, Void> { + /** */ + protected static final Thread mainThread = Thread.currentThread(); + + /** */ + protected Thread mapper; + + /** */ + protected Thread runner; + + /** */ + @IgniteLoggerResource + protected IgniteLogger log; + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { + mapper = Thread.currentThread(); + + return Collections.singleton(new ComputeJobAdapter() { + @Override public Serializable execute() { + runner = Thread.currentThread(); + + log.info("Runner: " + runner); + log.info("Main: " + mainThread); + log.info("Mapper: " + mapper); + + return null; + } + }); + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } +}