http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java new file mode 100644 index 0000000..384b953 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Test event storage. + */ +@GridCommonTest(group = "Kernal Self") +public class GridEventStorageCheckAllEventsSelfTest extends GridCommonAbstractTest { + /** */ + private static Ignite ignite; + + /** + * + */ + public GridEventStorageCheckAllEventsSelfTest() { + super(/*start grid*/true); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite = G.ignite(getTestGridName()); + + long tstamp = startTimestamp(); + + ignite.compute().localDeployTask(GridAllEventsTestTask.class, GridAllEventsTestTask.class.getClassLoader()); + + List<IgniteEvent> evts = pullEvents(tstamp, 1); + + assertEvent(evts.get(0).type(), EVT_TASK_DEPLOYED, evts); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + ignite = null; + } + + /** + * @param evtType Actual event type. + * @param expType Expected event type. + * @param evts Full list of events. + */ + private void assertEvent(int evtType, int expType, List<IgniteEvent> evts) { + assert evtType == expType : "Invalid event [evtType=" + evtType + ", expectedType=" + expType + + ", evts=" + evts + ']'; + } + + /** + * @throws Exception If test failed. + */ + public void testCheckpointEvents() throws Exception { + long tstamp = startTimestamp(); + + generateEvents(null, new GridAllCheckpointEventsTestJob()).get(); + + List<IgniteEvent> evts = pullEvents(tstamp, 11); + + assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); + assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); + assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); + assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); + assertEvent(evts.get(4).type(), EVT_CHECKPOINT_SAVED, evts); + assertEvent(evts.get(5).type(), EVT_CHECKPOINT_LOADED, evts); + assertEvent(evts.get(6).type(), EVT_CHECKPOINT_REMOVED, evts); + assertEvent(evts.get(7).type(), EVT_JOB_RESULTED, evts); + assertEvent(evts.get(8).type(), EVT_TASK_REDUCED, evts); + assertEvent(evts.get(9).type(), EVT_TASK_FINISHED, evts); + assertEvent(evts.get(10).type(), EVT_JOB_FINISHED, evts); + } + + /** + * @throws Exception If test failed. + */ + public void testTaskUndeployEvents() throws Exception { + long tstamp = startTimestamp(); + + generateEvents(null, new GridAllEventsSuccessTestJob()).get(); + + ignite.compute().undeployTask(GridAllEventsTestTask.class.getName()); + ignite.compute().localDeployTask(GridAllEventsTestTask.class, GridAllEventsTestTask.class.getClassLoader()); + + List<IgniteEvent> evts = pullEvents(tstamp, 12); + + assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); + assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); + assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); + assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); + assertEvent(evts.get(4).type(), EVT_CHECKPOINT_SAVED, evts); + assertEvent(evts.get(5).type(), EVT_CHECKPOINT_REMOVED, evts); + assertEvent(evts.get(6).type(), EVT_JOB_RESULTED, evts); + assertEvent(evts.get(7).type(), EVT_TASK_REDUCED, evts); + assertEvent(evts.get(8).type(), EVT_TASK_FINISHED, evts); + assertEvent(evts.get(9).type(), EVT_JOB_FINISHED, evts); + assertEvent(evts.get(10).type(), EVT_TASK_UNDEPLOYED, evts); + assertEvent(evts.get(11).type(), EVT_TASK_DEPLOYED, evts); + } + + /** + * @throws Exception If test failed. + */ + public void testSuccessTask() throws Exception { + long tstamp = startTimestamp(); + + generateEvents(null, new GridAllEventsSuccessTestJob()).get(); + + List<IgniteEvent> evts = pullEvents(tstamp, 10); + + assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); + assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); + assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); + assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); + assertEvent(evts.get(4).type(), EVT_CHECKPOINT_SAVED, evts); + assertEvent(evts.get(5).type(), EVT_CHECKPOINT_REMOVED, evts); + assertEvent(evts.get(6).type(), EVT_JOB_RESULTED, evts); + assertEvent(evts.get(7).type(), EVT_TASK_REDUCED, evts); + assertEvent(evts.get(8).type(), EVT_TASK_FINISHED, evts); + assertEvent(evts.get(9).type(), EVT_JOB_FINISHED, evts); + } + + /** + * @throws Exception If test failed. + */ + public void testFailTask() throws Exception { + long tstamp = startTimestamp(); + + ComputeTaskFuture<?> fut = generateEvents(null, new GridAllEventsFailTestJob()); + + try { + fut.get(); + + assert false : "Grid with locally executed job with timeout should throw GridComputeTaskTimeoutException."; + } + catch (IgniteCheckedException e) { + info("Expected exception caught [taskFuture=" + fut + ", exception=" + e + ']'); + } + + List<IgniteEvent> evts = pullEvents(tstamp, 7); + + assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); + assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); + assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); + assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); + assertEvent(evts.get(4).type(), EVT_JOB_RESULTED, evts); + assertEvent(evts.get(5).type(), EVT_TASK_FAILED, evts); + assertEvent(evts.get(6).type(), EVT_JOB_FAILED, evts); + } + + /** + * @throws Exception If test failed. + */ + public void testTimeoutTask() throws Exception { + long tstamp = startTimestamp(); + + ComputeTaskFuture<?> fut = generateEvents(1000L, new GridAllEventsTimeoutTestJob()); + + try { + fut.get(); + + assert false : "Task should fail."; + } + catch (ComputeTaskTimeoutException e) { + info("Expected timeout exception caught [taskFuture=" + fut + ", exception=" + e + ']'); + } + + List<IgniteEvent> evts = pullEvents(tstamp, 6); + + assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); + assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); + assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); + assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); + + boolean isTaskTimeout = false; + boolean isTaskFailed = false; + + for (int i = 4; i < evts.size(); i++) { + int evtType = evts.get(i).type(); + + if (evtType == EVT_TASK_TIMEDOUT) { + assert !isTaskTimeout; + assert !isTaskFailed; + + isTaskTimeout = true; + } + else if (evtType == EVT_TASK_FAILED) { + assert isTaskTimeout; + assert !isTaskFailed; + + isTaskFailed = true; + } + else { + assert evtType == EVT_JOB_CANCELLED + || evtType == EVT_JOB_TIMEDOUT + || evtType == EVT_JOB_FAILED + || evtType == EVT_JOB_FINISHED : + "Unexpected event: " + evts.get(i); + } + } + + assert isTaskTimeout; + assert isTaskFailed; + } + + /** + * Returns timestamp at the method call moment, but sleeps before return, + * to allow pass {@link GridUtils#currentTimeMillis()}. + * + * @return Call timestamp. + * @throws InterruptedException If sleep was interrupted. + */ + private long startTimestamp() throws InterruptedException { + long tstamp = System.currentTimeMillis(); + + Thread.sleep(20); + + return tstamp; + } + + /** + * Pull all test task related events since the given moment. + * + * @param since Earliest time to pulled events. + * @param evtCnt Expected event count + * @return List of events. + * @throws Exception If failed. + */ + private List<IgniteEvent> pullEvents(long since, int evtCnt) throws Exception { + IgnitePredicate<IgniteEvent> filter = new CustomEventFilter(GridAllEventsTestTask.class.getName(), since); + + for (int i = 0; i < 3; i++) { + List<IgniteEvent> evts = new ArrayList<>(ignite.events().localQuery((filter))); + + info("Filtered events [size=" + evts.size() + ", evts=" + evts + ']'); + + if (evtCnt != evts.size() && i < 2) { + U.warn(log, "Invalid event count (will retry in 1000 ms) [actual=" + evts.size() + + ", expected=" + evtCnt + ", evts=" + evts + ']'); + + U.sleep(1000); + + continue; + } + + assert evtCnt <= evts.size() : "Invalid event count [actual=" + evts.size() + ", expected=" + evtCnt + + ", evts=" + evts + ']'; + + return evts; + } + + assert false; + + return null; + } + + /** + * @param timeout Timeout. + * @param job Job. + * @return Task future. + * @throws Exception If failed. + */ + private ComputeTaskFuture<?> generateEvents(@Nullable Long timeout, ComputeJob job) throws Exception { + IgniteCompute comp = ignite.compute().enableAsync(); + + if (timeout == null) + comp.execute(GridAllEventsTestTask.class.getName(), job); + else + comp.withTimeout(timeout).execute(GridAllEventsTestTask.class.getName(), job); + + return comp.future(); + } + + /** + * + */ + private static class CustomEventFilter implements IgnitePredicate<IgniteEvent> { + /** */ + private final String taskName; + + /** */ + private final long tstamp; + + /** + * @param taskName Task name. + * @param tstamp Timestamp. + */ + CustomEventFilter(String taskName, long tstamp) { + assert taskName != null; + assert tstamp > 0; + + this.taskName = taskName; + this.tstamp = tstamp; + } + + /** {@inheritDoc} */ + @Override public boolean apply(IgniteEvent evt) { + if (evt.timestamp() >= tstamp) { + if (evt instanceof IgniteTaskEvent) + return taskName.equals(((IgniteTaskEvent)evt).taskName()); + else if (evt instanceof IgniteJobEvent) + return taskName.equals(((IgniteJobEvent)evt).taskName()); + else if (evt instanceof IgniteDeploymentEvent) + return taskName.equals(((IgniteDeploymentEvent)evt).alias()); + else if (evt instanceof IgniteCheckpointEvent) + return true; + } + + return false; + } + } + + /** + * + */ + private static class GridAllEventsSuccessTestJob extends ComputeJobAdapter { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession taskSes; + + /** {@inheritDoc} */ + @Override public String execute() throws IgniteCheckedException { + assert taskSes != null; + + taskSes.saveCheckpoint("testCheckpoint", "TestState"); + taskSes.removeCheckpoint("testCheckpoint"); + + return "GridAllEventsSuccessTestJob-test-event-success."; + } + } + + /** + * + */ + private static class GridAllEventsFailTestJob extends ComputeJobAdapter { + /** {@inheritDoc} */ + @Override public String execute() { + throw new RuntimeException("GridAllEventsFailTestJob expected test exception."); + } + } + + /** + */ + private static class GridAllEventsTimeoutTestJob extends ComputeJobAdapter { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override public String execute() { + try { + while (!isCancelled()) + Thread.sleep(5000); + } + catch (InterruptedException ignored) { + if (log.isInfoEnabled()) + log.info("GridAllEventsTimeoutTestJob was interrupted."); + + return "GridAllEventsTimeoutTestJob-test-event-timeout."; + } + + return "GridAllEventsTimeoutTestJob-test-event-timeout."; + } + } + + /** + * + */ + private static class GridAllCheckpointEventsTestJob extends ComputeJobAdapter { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession taskSes; + + /** {@inheritDoc} */ + @Override public String execute() throws IgniteCheckedException { + assert taskSes != null; + + taskSes.saveCheckpoint("testAllCheckpoint", "CheckpointTestState"); + taskSes.loadCheckpoint("testAllCheckpoint"); + taskSes.removeCheckpoint("testAllCheckpoint"); + + return "GridAllCheckpointEventsSuccess-test-all-checkpoint-event-success."; + } + } + + /** + * + */ + @ComputeTaskSessionFullSupport + private static class GridAllEventsTestTask extends ComputeTaskSplitAdapter<Object, Object> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { + return Collections.singleton((ComputeJob)arg); + } + + /** {@inheritDoc} */ + @Override public Serializable reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results != null; + assert results.size() == 1; + + return (Serializable)results; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageRuntimeConfigurationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageRuntimeConfigurationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageRuntimeConfigurationSelfTest.java new file mode 100644 index 0000000..073cead --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageRuntimeConfigurationSelfTest.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; +import org.junit.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Tests for runtime events configuration. + */ +public class GridEventStorageRuntimeConfigurationSelfTest extends GridCommonAbstractTest { + /** */ + private int[] inclEvtTypes; + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setIncludeEventTypes(inclEvtTypes); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testEnableWithDefaults() throws Exception { + inclEvtTypes = null; + + try { + Ignite g = startGrid(); + + final AtomicInteger cnt = new AtomicInteger(); + + g.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + cnt.incrementAndGet(); + + return true; + } + }, EVT_TASK_STARTED); + + g.compute().run(F.noop()); + + assertEquals(0, cnt.get()); + + g.events().enableLocal(EVT_TASK_STARTED); + + g.compute().run(F.noop()); + + assertEquals(1, cnt.get()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testEnableWithIncludes() throws Exception { + inclEvtTypes = new int[] { EVT_TASK_STARTED, EVT_TASK_FINISHED }; + + try { + Ignite g = startGrid(); + + final AtomicInteger cnt = new AtomicInteger(); + + g.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + cnt.incrementAndGet(); + + return true; + } + }, EVT_TASK_STARTED, EVT_TASK_FINISHED, EVT_JOB_STARTED); + + g.compute().run(F.noop()); + + assertEquals(2, cnt.get()); + + g.events().enableLocal(EVT_TASK_FINISHED, EVT_JOB_STARTED); + + g.compute().run(F.noop()); + + assertEquals(5, cnt.get()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testDisableWithIncludes() throws Exception { + inclEvtTypes = null; + + try { + Ignite g = startGrid(); + + g.events().enableLocal(EVT_TASK_STARTED, EVT_TASK_FINISHED, EVT_JOB_STARTED); + + final AtomicInteger cnt = new AtomicInteger(); + + g.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + cnt.incrementAndGet(); + + return true; + } + }, EVT_TASK_STARTED, EVT_TASK_FINISHED, EVT_JOB_STARTED); + + g.compute().run(F.noop()); + + assertEquals(3, cnt.get()); + + g.events().disableLocal(EVT_TASK_STARTED, EVT_TASK_FINISHED, EVT_JOB_FAILED); + + g.compute().run(F.noop()); + + assertEquals(4, cnt.get()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testEnableDisable() throws Exception { + inclEvtTypes = null; + + try { + Ignite g = startGrid(); + + IgniteEvents evts = g.events(); + + evts.enableLocal(EVT_CACHE_OBJECT_PUT); + + evts.disableLocal(EVT_CACHE_OBJECT_PUT); + + for (int evtType : evts.enabledEvents()) + assertFalse(evtType == EVT_CACHE_OBJECT_PUT); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("UnusedDeclaration") + public void testInvalidTypes() throws Exception { + inclEvtTypes = new int[]{EVT_TASK_STARTED}; + + try (Ignite g = startGrid()) { + assertTrue(g.events().isEnabled(EVT_TASK_STARTED)); + + try { + g.events().isEnabled(-13); + + fail("Expected IgniteCheckedException"); + } + catch (IllegalArgumentException e) { + info("Caught expected exception: " + e); + } + } + finally { + stopAllGrids(); + } + + inclEvtTypes = new int[]{-13}; + + try (Ignite g = startGrid()) { + fail("Expected IgniteCheckedException"); + } + catch (IgniteCheckedException e) { + info("Caught expected exception: " + e); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testGetters() throws Exception { + inclEvtTypes = new int[]{EVT_TASK_STARTED, EVT_TASK_FINISHED, 30000}; + + try { + Ignite g = startGrid(); + + assertEqualsWithoutOrder(inclEvtTypes, getEnabledEvents(g)); + assertEqualsWithoutOrder(inclEvtTypes, getEnabledEvents(1013, g, 30000)); + + g.events().enableLocal(20000, EVT_TASK_STARTED, EVT_CACHE_ENTRY_CREATED); + + assertEqualsWithoutOrder( + new int[] {EVT_TASK_STARTED, EVT_TASK_FINISHED, EVT_CACHE_ENTRY_CREATED, 20000, 30000}, + getEnabledEvents(g)); + + assertEqualsWithoutOrder( + new int[] {EVT_TASK_STARTED, EVT_TASK_FINISHED, EVT_CACHE_ENTRY_CREATED, 20000, 30000}, + getEnabledEvents(1013, g, 20000, 30000)); + + g.events().disableLocal(20000, 20001, 30000, EVT_TASK_STARTED, EVT_CACHE_ENTRY_CREATED); + + assertEqualsWithoutOrder( + new int[] {EVT_TASK_FINISHED, EVT_TASK_STARTED, 30000}, + getEnabledEvents(g)); + + assertEqualsWithoutOrder( + new int[] {EVT_TASK_FINISHED, EVT_TASK_STARTED, 30000}, + getEnabledEvents(1013, g, 20000, 30000)); + + int[] a = new int[1013]; + + for (int i = 0; i < 1000; i++) + a[i] = 1001 + i; + + a[1000] = EVT_TASK_TIMEDOUT; + a[1001] = EVT_TASK_STARTED; + + randomShuffle(a, 1002); + + int[] a0 = Arrays.copyOf(a, a.length + 1); + + g.events().enableLocal(Arrays.copyOf(a, 1002)); + + a0[1002] = EVT_TASK_FINISHED; + a0[1003] = 30000; + + assertEqualsWithoutOrder(Arrays.copyOf(a0, 1004), getEnabledEvents(g)); + assertEqualsWithoutOrder(Arrays.copyOf(a0, 1004), getEnabledEvents(2013, g, 30000)); + + g.events().disableLocal(Arrays.copyOf(a, 1002)); + + assertEqualsWithoutOrder( + new int[] {EVT_TASK_STARTED, EVT_TASK_FINISHED, 30000}, + getEnabledEvents(g)); + + assertEqualsWithoutOrder( + new int[] {EVT_TASK_STARTED, EVT_TASK_FINISHED, 30000}, + getEnabledEvents(1013, g, 20000, 30000)); + } + finally { + stopAllGrids(); + } + } + + /** + * @param a Array. + * @param len Prefix length. + */ + private void randomShuffle(int[] a, int len) { + Random rand = new Random(); + + for (int i = len - 1; i > 0; i--) { + int j = rand.nextInt(i); + + int t = a[i]; + a[i] = a[j]; + a[j] = t; + } + } + + /** + * @param a First array. + * @param b Second array. + */ + private void assertEqualsWithoutOrder(int[] a, int[] b) { + assertNotNull(a); + assertNotNull(b); + assertEquals(a.length, b.length); + + int[] a0 = Arrays.copyOf(a, a.length); + int[] b0 = Arrays.copyOf(a, a.length); + + Arrays.sort(a0); + Arrays.sort(b0); + + Assert.assertArrayEquals(a0, b0); + } + + /** + * @param g Grid. + * @return Enabled events. + */ + private int[] getEnabledEvents(Ignite g) { + return g.events().enabledEvents(); + } + + /** + * @param limit Loop limit. + * @param g Grid. + * @param customTypes Array of event types. + * @return Enabled events counted with loop (1..limit) and checks of custom types. + */ + private int[] getEnabledEvents(int limit, Ignite g, int... customTypes) { + Collection<Integer> res = new HashSet<>(); + + IgniteEvents evts = g.events(); + + for (int i = 1; i <= limit; i++) { + if (evts.isEnabled(i)) + res.add(i); + } + + if (customTypes != null) { + for (int i : customTypes) + if (evts.isEnabled(i)) + res.add(i); + } + + return U.toIntArray(res); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageSelfTest.java new file mode 100644 index 0000000..4c3ad52 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageSelfTest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Event storage tests. + * + * Note: + * Test based on events generated by test task execution. + * Filter class must be static because it will be send to remote host in + * serialized form. + */ +@GridCommonTest(group = "Kernal Self") +public class GridEventStorageSelfTest extends GridCommonAbstractTest { + /** First grid. */ + private static Ignite ignite1; + + /** Second grid. */ + private static Ignite ignite2; + + /** */ + public GridEventStorageSelfTest() { + super(/*start grid*/false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ignite1 = startGrid(1); + ignite2 = startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception In case of error. + */ + public void testAddRemoveGlobalListener() throws Exception { + IgnitePredicate<IgniteEvent> lsnr = new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + info("Received local event: " + evt); + + return true; + } + }; + + ignite1.events().localListen(lsnr, EVTS_ALL_MINUS_METRIC_UPDATE); + + assert ignite1.events().stopLocalListen(lsnr); + } + + /** + * @throws Exception In case of error. + */ + public void testAddRemoveDiscoListener() throws Exception { + IgnitePredicate<IgniteEvent> lsnr = new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + info("Received local event: " + evt); + + return true; + } + }; + + ignite1.events().localListen(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); + + assert ignite1.events().stopLocalListen(lsnr); + assert !ignite1.events().stopLocalListen(lsnr); + } + + /** + * @throws Exception In case of error. + */ + public void testLocalNodeEventStorage() throws Exception { + TestEventListener lsnr = new TestEventListener(); + + IgnitePredicate<IgniteEvent> filter = new TestEventFilter(); + + // Check that two same listeners may be added. + ignite1.events().localListen(lsnr, EVT_TASK_STARTED); + ignite1.events().localListen(lsnr, EVT_TASK_STARTED); + + // Execute task. + generateEvents(ignite1); + + assert lsnr.getCounter() == 1; + + Collection<IgniteEvent> evts = ignite1.events().localQuery(filter); + + assert evts != null; + assert evts.size() == 1; + + // Execute task. + generateEvents(ignite1); + + // Check that listener has been removed. + assert lsnr.getCounter() == 2; + + // Check that no problems with nonexistent listeners. + assert ignite1.events().stopLocalListen(lsnr); + assert !ignite1.events().stopLocalListen(lsnr); + + // Check for events from local node. + evts = ignite1.events().localQuery(filter); + + assert evts != null; + assert evts.size() == 2; + + // Check for events from empty remote nodes collection. + try { + events(ignite1.cluster().forPredicate(F.<ClusterNode>alwaysFalse())).remoteQuery(filter, 0); + } + catch (ClusterGroupEmptyException ignored) { + // No-op + } + } + + /** + * @throws Exception In case of error. + */ + public void testRemoteNodeEventStorage() throws Exception { + IgnitePredicate<IgniteEvent> filter = new TestEventFilter(); + + generateEvents(ignite2); + + ClusterGroup prj = ignite1.cluster().forPredicate(F.remoteNodes(ignite1.cluster().localNode().id())); + + Collection<IgniteEvent> evts = events(prj).remoteQuery(filter, 0); + + assert evts != null; + assert evts.size() == 1; + } + + /** + * @throws Exception In case of error. + */ + public void testRemoteAndLocalNodeEventStorage() throws Exception { + IgnitePredicate<IgniteEvent> filter = new TestEventFilter(); + + generateEvents(ignite1); + + Collection<IgniteEvent> evts = ignite1.events().remoteQuery(filter, 0); + Collection<IgniteEvent> locEvts = ignite1.events().localQuery(filter); + Collection<IgniteEvent> remEvts = + events(ignite1.cluster().forPredicate(F.remoteNodes(ignite1.cluster().localNode().id()))).remoteQuery(filter, 0); + + assert evts != null; + assert locEvts != null; + assert remEvts != null; + assert evts.size() == 1; + assert locEvts.size() == 1; + assert remEvts.isEmpty(); + } + + /** + * Create events in grid. + * + * @param ignite Grid. + * @throws IgniteCheckedException In case of error. + */ + private void generateEvents(Ignite ignite) throws IgniteCheckedException { + ignite.compute().localDeployTask(GridEventTestTask.class, GridEventTestTask.class.getClassLoader()); + + ignite.compute().execute(GridEventTestTask.class.getName(), null); + } + + /** + * Test task. + */ + private static class GridEventTestTask extends ComputeTaskSplitAdapter<Object, Object> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { + return Collections.singleton(new GridEventTestJob()); + } + + /** {@inheritDoc} */ + @Override public Serializable reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results != null; + assert results.size() == 1; + + return results.get(0).getData(); + } + } + + /** + * Test job. + */ + private static class GridEventTestJob extends ComputeJobAdapter { + /** {@inheritDoc} */ + @Override public String execute() throws IgniteCheckedException { + return "GridEventTestJob-test-event."; + } + } + + /** + * Test event listener. + */ + private class TestEventListener implements IgnitePredicate<IgniteEvent> { + /** Event counter. */ + private AtomicInteger cnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public boolean apply(IgniteEvent evt) { + info("Event storage event: evt=" + evt); + + // Count only started tasks. + if (evt.type() == EVT_TASK_STARTED) + cnt.incrementAndGet(); + + return true; + } + + /** + * @return Event counter value. + */ + public int getCounter() { + return cnt.get(); + } + + /** + * Clear event counter. + */ + public void clearCounter() { + cnt.set(0); + } + } + + /** + * Test event filter. + */ + private static class TestEventFilter implements IgnitePredicate<IgniteEvent> { + /** {@inheritDoc} */ + @Override public boolean apply(IgniteEvent evt) { + // Accept only predefined TASK_STARTED events. + return evt.type() == EVT_TASK_STARTED; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridExecutorServiceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridExecutorServiceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridExecutorServiceTest.java new file mode 100644 index 0000000..ac20e67 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridExecutorServiceTest.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.executor.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Grid distributed executor test. + */ +@GridCommonTest(group = "Thread Tests") +public class GridExecutorServiceTest extends GridCommonAbstractTest { + /** */ + public GridExecutorServiceTest() { + super(true); + } + + /** + * @throws Exception Thrown in case of test failure. + */ + public void testExecute() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + ExecutorService srvc = createExecutorService(ignite); + + srvc.execute(new Runnable() { + @IgniteInstanceResource + private Ignite ignite; + + @Override public void run() { + System.out.println("Test message."); + + assert this.ignite != null; + } + }); + + srvc.execute(new TestRunnable()); + + srvc.shutdown(); + } + + /** + * @throws Exception Thrown in case of test failure. + */ + public void testSubmit() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + ExecutorService srvc = createExecutorService(ignite); + + Future<?> fut = srvc.submit(new TestRunnable()); + + Object res = fut.get(); + + info("Default Runnable result:" + res); + + assert res == null : "Failed to get valid default result for submitted Runnable: " + res; + + String val = "test-value"; + + fut = srvc.submit(new TestRunnable(), val); + + res = fut.get(); + + info("Defined Runnable result:" + res); + + assert val.equals(res) : "Failed to get valid predefined result for submitted Runnable: " + res; + + fut = srvc.submit(new TestCallable<>(val)); + + res = fut.get(); + + info("Callable result:" + res); + + assert val.equals(res) : "Failed to get valid result for submitted Callable: " + res; + + srvc.shutdown(); + } + + /** + * @throws Exception Thrown in case of test failure. + */ + public void testSubmitWithFutureTimeout() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + ExecutorService srvc = createExecutorService(ignite); + + Future<Integer> fut = srvc.submit(new TestCallable<>(3000)); // Just sleep for 3 seconds. + + boolean ok = true; + + try { + fut.get(1, TimeUnit.SECONDS); + + ok = false; + } + catch (TimeoutException e) { + info("Task timeout elapsed: " + e.getMessage()); + } + + assert ok : "Timeout must be thrown."; + + srvc.shutdown(); + } + + /** + * @throws Exception Thrown in case of test failure. + */ + @SuppressWarnings("TooBroadScope") + public void testInvokeAll() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + ExecutorService srvc = createExecutorService(ignite); + + Collection<Callable<String>> cmds = new ArrayList<>(2); + + String val1 = "test-value-1"; + String val2 = "test-value-2"; + + cmds.add(new TestCallable<>(val1)); + cmds.add(new TestCallable<>(val2)); + + List<Future<String>> futs = srvc.invokeAll(cmds); + + assert futs != null; + assert futs.size() == 2; + + String res1 = futs.get(0).get(); + String res2 = futs.get(1).get(); + + assert val1.equals(res1) : "Failed to get valid result for first command: " + res1; + assert val2.equals(res2) : "Failed to get valid result for second command: " + res2; + + srvc.shutdown(); + } + + /** + * @throws Exception Thrown in case of test failure. + */ + @SuppressWarnings("TooBroadScope") + public void testInvokeAllWithTimeout() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + ExecutorService srvc = createExecutorService(ignite); + + Collection<Callable<Integer>> cmds = new ArrayList<>(); + + cmds.add(new TestCallable<>(3000)); // Just sleeps for 3 seconds. + cmds.add(new TestCallable<>(3000)); // Just sleeps for 3 seconds. + + List<Future<Integer>> fut = srvc.invokeAll(cmds, 1, TimeUnit.SECONDS); + + assert fut != null; + assert fut.size() == 2; + + boolean ok = true; + + try { + fut.get(0).get(); + + ok = false; + } + catch (CancellationException e) { + info("First timeout task is cancelled: " + e.getMessage()); + } + + assert ok : "First task must be cancelled."; + + try { + fut.get(1).get(); + + ok = false; + } + catch (CancellationException e) { + info("Second timeout task is cancelled: " + e.getMessage()); + } + + assert ok : "Second task must be cancelled."; + + srvc.shutdown(); + } + + /** + * @throws Exception Thrown in case of test failure. + */ + @SuppressWarnings("TooBroadScope") + public void testInvokeAny() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + ExecutorService srvc = createExecutorService(ignite); + + Collection<Callable<String>> cmds = new ArrayList<>(2); + + String val1 = "test-value-1"; + String val2 = "test-value-2"; + + cmds.add(new TestCallable<>(val1)); + cmds.add(new TestCallable<>(val2)); + + String res = srvc.invokeAny(cmds); + + info("Result: " + res); + + assert val1.equals(res) : "Failed to get valid result: " + res; + + srvc.shutdown(); + } + + /** + * @throws Exception Thrown in case of test failure. + */ + @SuppressWarnings("TooBroadScope") + public void testInvokeAnyWithTimeout() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + ExecutorService srvc = createExecutorService(ignite); + + Collection<Callable<Integer>> timeoutCmds = new ArrayList<>(2); + + timeoutCmds.add(new TestCallable<>(5000)); + timeoutCmds.add(new TestCallable<>(5000)); + + boolean ok = true; + + try { + srvc.invokeAny(timeoutCmds, 1, TimeUnit.SECONDS); + + ok = false; + } + catch (TimeoutException e) { + info("Task timeout elapsed: " + e.getMessage()); + } + + assert ok : "Timeout must be thrown."; + + srvc.shutdown(); + } + + /** + * @param ignite Grid instance. + * @return Thrown in case of test failure. + */ + private ExecutorService createExecutorService(Ignite ignite) { + assert ignite != null; + + return new GridExecutorService((ClusterGroupAdapter) ignite, log()); + } + + /** + * @param <T> Type of the {@link Callable} argument. + */ + private static class TestCallable<T> implements Callable<T>, Serializable { + /** */ + private T data; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * @param data Data. + */ + TestCallable(T data) { + this.data = data; + } + + /** {@inheritDoc} */ + @Override public T call() throws Exception { + System.out.println("Test callable message."); + + assert ignite != null; + + if (data instanceof Integer) + Thread.sleep((Integer)data); + + return data; + } + } + + /** */ + private static class TestRunnable implements Runnable, Serializable { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public void run() { + System.out.println("Test Runnable message."); + + assert ignite != null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridExplicitImplicitDeploymentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridExplicitImplicitDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridExplicitImplicitDeploymentSelfTest.java new file mode 100644 index 0000000..f19622e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridExplicitImplicitDeploymentSelfTest.java @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +/** + * + */ +@GridCommonTest(group = "Kernal Self") +public class GridExplicitImplicitDeploymentSelfTest extends GridCommonAbstractTest { + /** */ + public GridExplicitImplicitDeploymentSelfTest() { + super(/*start grid*/false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + // Override P2P configuration to exclude Task and Job classes + cfg.setPeerClassLoadingLocalClassPathExclude(GridDeploymentResourceTestTask.class.getName(), + GridDeploymentResourceTestJob.class.getName()); + + cfg.setDeploymentMode(IgniteDeploymentMode.ISOLATED); + + return cfg; + } + + /** + * @throws Exception If test failed. + */ + public void testImplicitDeployLocally() throws Exception { + execImplicitDeployLocally(true, true, true); + } + + /** + * @throws Exception If test failed. + */ + public void testImplicitDeployP2P() throws Exception { + execImplicitDeployP2P(true, true, true); + } + + /** + * @throws Exception If test failed. + */ + public void testExplicitDeployLocally() throws Exception { + execExplicitDeployLocally(true, true, true); + } + + /** + * @throws Exception If test failed. + */ + public void testExplicitDeployP2P() throws Exception { + execExplicitDeployP2P(true, true, true); + } + + /** + * @param ignite Grid. + */ + @SuppressWarnings({"CatchGenericClass"}) + private void stopGrid(Ignite ignite) { + try { + if (ignite != null) + G.stop(ignite.name(), true); + } + catch (Throwable e) { + error("Got error when stopping grid.", e); + } + } + + /** + * @param byCls If {@code true} than executes task by Class. + * @param byTask If {@code true} than executes task instance. + * @param byName If {@code true} than executes task by class name. + * @throws Exception If test failed. + */ + @SuppressWarnings("unchecked") + private void execExplicitDeployLocally(boolean byCls, boolean byTask, boolean byName) throws Exception { + Ignite ignite = null; + + try { + ignite = startGrid(); + + // Explicit Deployment. Task execution should return 0. + // Say resource class loader - different to task one. + ClassLoader ldr1 = new GridTestClassLoader( + Collections.singletonMap("testResource", "1"), + getClass().getClassLoader()); + + // Assume that users task and job were loaded with this class loader + ClassLoader ldr2 = new GridTestClassLoader( + Collections.singletonMap("testResource", "2"), + getClass().getClassLoader(), + GridDeploymentResourceTestTask.class.getName(), + GridDeploymentResourceTestJob.class.getName() + ); + + info("Loader1: " + ldr1); + info("Loader2: " + ldr2); + + Class<? extends ComputeTask<String, Integer>> taskCls = (Class<? extends ComputeTask<String, Integer>>) + ldr2.loadClass(GridDeploymentResourceTestTask.class.getName()); + + // Check auto-deploy. It should pick up resource class loader. + if (byCls) { + ignite.compute().localDeployTask(taskCls, ldr1); + + Integer res = ignite.compute().execute(taskCls, null); + + assert res != null; + assert res == 2 : "Invalid response: " + res; + } + + if (byTask) { + ignite.compute().localDeployTask(taskCls, ldr1); + + Integer res = ignite.compute().execute(taskCls.newInstance(), null); + + assert res != null; + assert res == 2 : "Invalid response: " + res; + } + + if (byName) { + ignite.compute().localDeployTask(taskCls, ldr1); + + Integer res = (Integer) ignite.compute().execute(taskCls.getName(), null); + + assert res != null; + assert res == 1 : "Invalid response: " + res; + } + } + finally { + stopGrid(ignite); + } + } + + /** + * @param byCls If {@code true} than executes task by Class. + * @param byTask If {@code true} than executes task instance. + * @param byName If {@code true} than executes task by class name. + * @throws Exception If test failed. + */ + @SuppressWarnings("unchecked") + private void execImplicitDeployLocally(boolean byCls, boolean byTask, boolean byName) throws Exception { + Ignite ignite = null; + + try { + ignite = startGrid(); + + // First task class loader. + ClassLoader ldr1 = new GridTestClassLoader( + Collections.singletonMap("testResource", "1"), + getClass().getClassLoader(), + GridDeploymentResourceTestTask.class.getName(), + GridDeploymentResourceTestJob.class.getName() + ); + + // Second task class loader + ClassLoader ldr2 = new GridTestClassLoader( + Collections.singletonMap("testResource", "2"), + getClass().getClassLoader(), + GridDeploymentResourceTestTask.class.getName(), + GridDeploymentResourceTestJob.class.getName() + ); + + // The same name but different classes/ class loaders. + Class<? extends ComputeTask<String, Integer>> taskCls1 = (Class<? extends ComputeTask<String, Integer>>) + ldr1.loadClass(GridDeploymentResourceTestTask.class.getName()); + + Class<? extends ComputeTask<String, Integer>> taskCls2 = (Class<? extends ComputeTask<String, Integer>>) + ldr2.loadClass(GridDeploymentResourceTestTask.class.getName()); + + if (byCls) { + Integer res1 = ignite.compute().execute(taskCls1, null); + Integer res2 = ignite.compute().execute(taskCls2, null); + + assert res1 != null; + assert res2 != null; + + assert res1 == 1 : "Invalid res1: " + res1; + assert res2 == 2 : "Invalid res2: " + res2; + } + + if (byTask) { + Integer res1 = ignite.compute().execute(taskCls1.newInstance(), null); + Integer res2 = ignite.compute().execute(taskCls2.newInstance(), null); + + assert res1 != null; + assert res2 != null; + + assert res1 == 1 : "Invalid res1: " + res1; + assert res2 == 2 : "Invalid res2: " + res2; + } + + if (byName) { + ignite.compute().localDeployTask(taskCls1, ldr1); + + Integer res1 = (Integer) ignite.compute().execute(taskCls1.getName(), null); + + ignite.compute().localDeployTask(taskCls2, ldr2); + + Integer res2 = (Integer) ignite.compute().execute(taskCls2.getName(), null); + + assert res1 != null; + assert res2 != null; + + assert res1 == 1 : "Invalid res1: " + res1; + assert res2 == 2 : "Invalid res2: " + res2; + } + } + finally { + stopGrid(ignite); + } + } + + /** + * @param byCls If {@code true} than executes task by Class. + * @param byTask If {@code true} than executes task instance. + * @param byName If {@code true} than executes task by class name. + * @throws Exception If test failed. + */ + @SuppressWarnings("unchecked") + private void execExplicitDeployP2P(boolean byCls, boolean byTask, boolean byName) throws Exception { + Ignite ignite1 = null; + Ignite ignite2 = null; + + try { + ignite1 = startGrid(1); + ignite2 = startGrid(2); + + ClassLoader ldr1 = new GridTestClassLoader( + Collections.singletonMap("testResource", "1"), + getClass().getClassLoader(), + GridDeploymentResourceTestTask.class.getName(), + GridDeploymentResourceTestJob.class.getName() + ); + + ClassLoader ldr2 = new GridTestClassLoader( + Collections.singletonMap("testResource", "2"), + getClass().getClassLoader(), + GridDeploymentResourceTestTask.class.getName(), + GridDeploymentResourceTestJob.class.getName() + ); + + Class<? extends ComputeTask<String, Integer>> taskCls = (Class<? extends ComputeTask<String, Integer>>) + ldr2.loadClass(GridDeploymentResourceTestTask.class.getName()); + + if (byCls) { + ignite1.compute().localDeployTask(taskCls, ldr1); + + // Even though the task is deployed with resource class loader, + // when we execute it, it will be redeployed with task class-loader. + Integer res = ignite1.compute().execute(taskCls, null); + + assert res != null; + assert res == 2 : "Invalid response: " + res; + } + + + if (byTask) { + ignite1.compute().localDeployTask(taskCls, ldr1); + + // Even though the task is deployed with resource class loader, + // when we execute it, it will be redeployed with task class-loader. + Integer res = ignite1.compute().execute(taskCls.newInstance(), null); + + assert res != null; + assert res == 2 : "Invalid response: " + res; + } + + if (byName) { + ignite1.compute().localDeployTask(taskCls, ldr1); + + // Even though the task is deployed with resource class loader, + // when we execute it, it will be redeployed with task class-loader. + Integer res = (Integer) ignite1.compute().execute(taskCls.getName(), null); + + assert res != null; + assert res == 1 : "Invalid response: " + res; + } + } + finally { + stopGrid(ignite2); + stopGrid(ignite1); + } + } + + /** + * @param byCls If {@code true} than executes task by Class. + * @param byTask If {@code true} than executes task instance. + * @param byName If {@code true} than executes task by class name. + * @throws Exception If test failed. + */ + @SuppressWarnings("unchecked") + private void execImplicitDeployP2P(boolean byCls, boolean byTask, boolean byName) throws Exception { + Ignite ignite1 = null; + Ignite ignite2 = null; + + try { + ignite1 = startGrid(1); + ignite2 = startGrid(2); + + ClassLoader ldr1 = new GridTestClassLoader( + Collections.singletonMap("testResource", "1"), + getClass().getClassLoader(), + GridDeploymentResourceTestTask.class.getName(), + GridDeploymentResourceTestJob.class.getName() + ); + + ClassLoader ldr2 = new GridTestClassLoader( + Collections.singletonMap("testResource", "2"), + getClass().getClassLoader(), + GridDeploymentResourceTestTask.class.getName(), + GridDeploymentResourceTestJob.class.getName() + ); + + Class<? extends ComputeTask<String, Integer>> taskCls1 = (Class<? extends ComputeTask<String, Integer>>) + ldr1.loadClass(GridDeploymentResourceTestTask.class.getName()); + + Class<? extends ComputeTask<String, Integer>> taskCls2 = (Class<? extends ComputeTask<String, Integer>>) + ldr2.loadClass(GridDeploymentResourceTestTask.class.getName()); + + if (byCls) { + Integer res1 = ignite1.compute().execute(taskCls1, null); + Integer res2 = ignite1.compute().execute(taskCls2, null); + + assert res1 != null; + assert res2 != null; + + assert res1 == 1 : "Invalid res1: " + res1; + assert res2 == 2 : "Invalid res2: " + res2; + } + + if (byTask) { + Integer res1 = ignite1.compute().execute(taskCls1.newInstance(), null); + Integer res2 = ignite1.compute().execute(taskCls2.newInstance(), null); + + assert res1 != null; + assert res2 != null; + + assert res1 == 1 : "Invalid res1: " + res1; + assert res2 == 2 : "Invalid res2: " + res2; + } + + if (byName) { + ignite1.compute().localDeployTask(taskCls1, ldr1); + + Integer res1 = (Integer) ignite1.compute().execute(taskCls1.getName(), null); + + ignite1.compute().localDeployTask(taskCls2, ldr2); + + Integer res2 = (Integer) ignite1.compute().execute(taskCls2.getName(), null); + + assert res1 != null; + assert res2 != null; + + assert res1 == 1 : "Invalid res1: " + res1; + assert res2 == 2 : "Invalid res2: " + res2; + } + } + finally { + stopGrid(ignite1); + stopGrid(ignite2); + } + } + + /** + * We use custom name to avoid auto-deployment in the same VM. + */ + @SuppressWarnings({"PublicInnerClass"}) + @ComputeTaskName("GridDeploymentResourceTestTask") + public static class GridDeploymentResourceTestTask extends ComputeTaskAdapter<String, Integer> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) throws IgniteCheckedException { + Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size()); + + boolean ignoreLocNode = false; + + UUID locId = ignite.configuration().getNodeId(); + + if (subgrid.size() == 1) + assert subgrid.get(0).id().equals(locId) : "Wrong node id."; + else + ignoreLocNode = true; + + for (ClusterNode node : subgrid) { + // Ignore local node. + if (ignoreLocNode && node.id().equals(locId)) + continue; + + map.put(new GridDeploymentResourceTestJob(), node); + } + + return map; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return results.get(0).getData(); + } + } + + /** + * Simple job for this test. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static final class GridDeploymentResourceTestJob extends ComputeJobAdapter { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + if (log.isInfoEnabled()) + log.info("Executing grid job: " + this); + + try { + ClassLoader ldr = Thread.currentThread().getContextClassLoader(); + + if (log.isInfoEnabled()) + log.info("Loader (inside job): " + ldr); + + InputStream in = ldr.getResourceAsStream("testResource"); + + if (in != null) { + Reader reader = new InputStreamReader(in); + + try { + char res = (char)reader.read(); + + return Integer.parseInt(Character.toString(res)); + } + finally { + U.close(in, null); + } + } + + return null; + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to execute job.", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridFactoryVmShutdownTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFactoryVmShutdownTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFactoryVmShutdownTest.java new file mode 100644 index 0000000..7a99eed --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFactoryVmShutdownTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.concurrent.*; + +import static org.apache.ignite.IgniteState.*; + +/** + * Tests for {@link org.apache.ignite.Ignition}. + */ +public class GridFactoryVmShutdownTest { + /** + * + */ + private GridFactoryVmShutdownTest() { + // No-op. + } + + /** + * @param args Args (optional). + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + final ConcurrentMap<String, IgniteState> states = new ConcurrentHashMap<>(); + + G.addListener(new IgniteListener() { + @Override public void onStateChange(@Nullable String name, IgniteState state) { + if (state == STARTED) { + IgniteState state0 = states.put(maskNull(name), STARTED); + + assert state0 == null; + } + else { + assert state == STOPPED; + + boolean replaced = states.replace(maskNull(name), STARTED, STOPPED); + + assert replaced; + } + } + }); + + // Test with shutdown hook enabled and disabled. + // System.setProperty(GridSystemProperties.GG_NO_SHUTDOWN_HOOK, "true"); + + // Grid will start and add shutdown hook. + G.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override public void run() { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName("test1"); + + try { + G.start(cfg); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to start grid in shutdown hook.", e); + } + finally { + X.println("States: " + states); + } + } + })); + + System.exit(0); + } + + /** + * Masks {@code null} string. + * + * @param s String to mask. + * @return Mask value or string itself if it is not {@code null}. + */ + private static String maskNull(String s) { + return s != null ? s : "null-mask-8AE34BF8"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridFailedInputParametersSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailedInputParametersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailedInputParametersSelfTest.java new file mode 100644 index 0000000..0abb6f8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailedInputParametersSelfTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Test for invalid input parameters. + */ +@GridCommonTest(group = "Kernal Self") +public class GridFailedInputParametersSelfTest extends GridCommonAbstractTest { + /** */ + private static Ignite ignite; + + /** */ + public GridFailedInputParametersSelfTest() { + super(/*start grid*/true); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite = G.ignite(getTestGridName()); + } + + /** + * @throws Exception Thrown in case of any errors. + */ + public void testAddEventLocalListener() throws Exception { + try { + ignite.events().localListen(null, EVTS_ALL); + + assert false : "Null listener can't be added."; + } + catch (NullPointerException ignored) { + // No-op. + } + } + + /** + * @throws Exception Thrown in case of any errors. + */ + public void testRemoveEventLocalListener() throws Exception { + try { + ignite.events().stopLocalListen(null); + + assert false : "Null listener can't be removed."; + } + catch (NullPointerException ignored) { + // No-op. + } + } + + /** + * @throws Exception Thrown in case of any errors. + */ + public void testAddDiscoveryListener() throws Exception { + try { + ignite.events().localListen(null, EVTS_ALL); + + assert false : "Null listener can't be added."; + } + catch (NullPointerException ignored) { + // No-op. + } + } + + /** + * @throws Exception Thrown in case of any errors. + */ + public void testRemoveDiscoveryListener() throws Exception { + try { + ignite.events().stopLocalListen(null); + + assert false : "Null listener can't be removed."; + } + catch (NullPointerException ignored) { + // No-op. + } + } + + /** + * @throws Exception Thrown in case of any errors. + */ + public void testGetNode() throws Exception { + try { + ignite.cluster().node(null); + + assert false : "Null nodeId can't be entered."; + } + catch (NullPointerException ignored) { + // No-op. + } + } + + /** + * @throws Exception Thrown in case of any errors. + */ + public void testPingNode() throws Exception { + try { + ignite.cluster().pingNode(null); + + assert false : "Null nodeId can't be entered."; + } + catch (NullPointerException ignored) { + // No-op. + } + } + + /** + * @throws Exception Thrown in case of any errors. + */ + public void testDeployTask() throws Exception { + try { + ignite.compute().localDeployTask(null, null); + + assert false : "Null task can't be entered."; + } + catch (NullPointerException ignored) { + // No-op. + } + + try { + ignite.compute().localDeployTask(null, null); + + assert false : "Null task can't be entered."; + } + catch (NullPointerException ignored) { + // No-op. + } + + // Check for exceptions. + ignite.compute().localDeployTask(GridTestTask.class, U.detectClassLoader(GridTestTask.class)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java new file mode 100644 index 0000000..025acdb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.failover.*; +import org.apache.ignite.spi.failover.always.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Test failover and custom topology. Topology returns local node if remote node fails. + */ +@GridCommonTest(group = "Kernal Self") +public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest { + /** */ + private final AtomicInteger failCnt = new AtomicInteger(0); + + /** */ + private static final Object mux = new Object(); + + /** */ + public GridFailoverCustomTopologySelfTest() { + super(/*start Grid*/false); + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setNodeId(null); + + cfg.setFailoverSpi(new AlwaysFailoverSpi() { + /** {@inheritDoc} */ + @Override public ClusterNode failover(FailoverContext ctx, List<ClusterNode> top) { + failCnt.incrementAndGet(); + + return super.failover(ctx, top); + } + }); + + return cfg; + } + /** + * Tests that failover don't pick local node if it has been excluded from topology. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"WaitNotInLoop", "UnconditionalWait", "unchecked"}) + public void testFailoverTopology() throws Exception { + try { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + assert ignite1 != null; + assert ignite2 != null; + + ignite1.compute().localDeployTask(JobTask.class, JobTask.class.getClassLoader()); + + try { + ComputeTaskFuture<String> fut; + + synchronized(mux){ + IgniteCompute comp = ignite1.compute().enableAsync(); + + comp.execute(JobTask.class, null); + + fut = comp.future(); + + mux.wait(); + } + + stopAndCancelGrid(2); + + String res = fut.get(); + + info("Task result: " + res); + } + catch (IgniteCheckedException e) { + info("Got unexpected grid exception: " + e); + } + + info("Failed over: " + failCnt.get()); + + assert failCnt.get() == 1 : "Invalid fail over counter [expected=1, actual=" + failCnt.get() + ']'; + } + finally { + stopGrid(1); + + // Stopping stopped instance just in case. + stopGrid(2); + } + } + + /** */ + @SuppressWarnings("PublicInnerClass") + public static class JobTask extends ComputeTaskAdapter<String, String> { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) throws IgniteCheckedException { + assert ignite != null; + + UUID locNodeId = ignite.configuration().getNodeId(); + + assert locNodeId != null; + + if (log.isInfoEnabled()) + log.info("Mapping jobs [subgrid=" + subgrid + ", arg=" + arg + ']'); + + ClusterNode remoteNode = null; + + for (ClusterNode node : subgrid) { + if (!node.id().equals(locNodeId)) + remoteNode = node; + } + + return Collections.singletonMap(new ComputeJobAdapter(locNodeId) { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @SuppressWarnings("NakedNotify") + @Override public Serializable execute() throws IgniteCheckedException { + assert ignite != null; + + UUID nodeId = ignite.configuration().getNodeId(); + + assert nodeId != null; + + if (!nodeId.equals(argument(0))) { + try { + synchronized(mux) { + mux.notifyAll(); + } + + Thread.sleep(Integer.MAX_VALUE); + } + catch (InterruptedException e) { + throw new ComputeExecutionRejectedException("Expected interruption during execution.", e); + } + } + else + return "success"; + + throw new ComputeExecutionRejectedException("Expected exception during execution."); + } + }, remoteNode); + } + + /** {@inheritDoc} */ + @Override public String reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == 1; + + return results.get(0).getData(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverSelfTest.java new file mode 100644 index 0000000..6e845ea --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverSelfTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.failover.always.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Failover tests. + */ +@GridCommonTest(group = "Kernal Self") +public class GridFailoverSelfTest extends GridCommonAbstractTest { + /** Initial node that job has been mapped to. */ + private static final AtomicReference<ClusterNode> nodeRef = new AtomicReference<>(null); + + /** */ + public GridFailoverSelfTest() { + super(/*start Grid*/false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setFailoverSpi(new AlwaysFailoverSpi()); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testJobFail() throws Exception { + try { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + assert ignite1 != null; + assert ignite2 != null; + + Integer res = ignite1.compute().withTimeout(10000).execute(JobFailTask.class.getName(), "1"); + + assert res != null; + assert res == 1; + } + finally { + stopGrid(1); + stopGrid(2); + } + } + + /** + * + */ + @ComputeTaskSessionFullSupport + private static class JobFailTask implements ComputeTask<String, Object> { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) throws IgniteCheckedException { + ses.setAttribute("fail", true); + + nodeRef.set(subgrid.get(0)); + + return Collections.singletonMap(new ComputeJobAdapter(arg) { + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + boolean fail; + + UUID locId = ignite.configuration().getNodeId(); + + try { + fail = ses.<String, Boolean>waitForAttribute("fail", 0); + } + catch (InterruptedException e) { + throw new IgniteCheckedException("Got interrupted while waiting for attribute to be set.", e); + } + + if (fail) { + ses.setAttribute("fail", false); + + assert nodeRef.get().id().equals(locId); + + throw new IgniteCheckedException("Job exception."); + } + + assert !nodeRef.get().id().equals(locId); + + // This job does not return any result. + return Integer.parseInt(this.<String>argument(0)); + } + }, subgrid.get(0)); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, + List<ComputeJobResult> received) throws IgniteCheckedException { + if (res.getException() != null && !(res.getException() instanceof ComputeUserUndeclaredException)) { + assert res.getNode().id().equals(nodeRef.get().id()); + + return ComputeJobResultPolicy.FAILOVER; + } + + assert !res.getNode().id().equals(nodeRef.get().id()); + + return ComputeJobResultPolicy.REDUCE; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == 1; + + assert nodeRef.get() != null; + + assert !results.get(0).getNode().id().equals(nodeRef.get().id()) : + "Initial node and result one are the same (should be different)."; + + return results.get(0).getData(); + } + } +}