http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeVisorAttributesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeVisorAttributesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeVisorAttributesSelfTest.java new file mode 100644 index 0000000..794e989 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeVisorAttributesSelfTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * Ensures that system properties required by Visor are always passed to node attributes. + */ +public class GridNodeVisorAttributesSelfTest extends GridCommonAbstractTest { + /** System properties required by Visor. */ + private static final String[] SYSTEM_PROPS = new String[] { + "java.version", + "java.vm.name", + "os.arch", + "os.name", + "os.version" + }; + + /** GridGain-specific properties required by Visor. */ + private static final String[] GG_PROPS = new String[] { + "org.gridgain.jvm.pid" + }; + + /** System and environment properties to include. */ + private String[] inclProps; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setIncludeProperties(inclProps); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Start grid node and ensure that Visor-related node attributes are set properly. + * + * @throws Exception If grid start failed. + */ + private void startGridAndCheck() throws Exception { + Ignite g = startGrid(); + + Map<String, Object> attrs = g.cluster().localNode().attributes(); + + for (String prop : SYSTEM_PROPS) { + assert attrs.containsKey(prop); + + assertEquals(System.getProperty(prop), attrs.get(prop)); + } + + for (String prop : GG_PROPS) + assert attrs.containsKey(prop); + } + + /** + * Test with 'includeProperties' configuration parameter set to {@code null}. + * + * @throws Exception If failed. + */ + public void testIncludeNull() throws Exception { + inclProps = null; + + startGridAndCheck(); + } + + /** + * Test with 'includeProperties' configuration parameter set to empty array. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ZeroLengthArrayAllocation") + public void testIncludeEmpty() throws Exception { + inclProps = new String[] {}; + + startGridAndCheck(); + } + + /** + * Test with 'includeProperties' configuration parameter set to array with some values. + * + * @throws Exception If failed. + */ + public void testIncludeNonEmpty() throws Exception { + inclProps = new String[] {"prop1", "prop2"}; + + startGridAndCheck(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridNonHistoryMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridNonHistoryMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridNonHistoryMetricsSelfTest.java new file mode 100644 index 0000000..8ed0f72 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridNonHistoryMetricsSelfTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * + */ +public class GridNonHistoryMetricsSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMetricsHistorySize(5); + + cfg.setCacheConfiguration(); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSingleTaskMetrics() throws Exception { + final Ignite ignite = grid(); + + ignite.compute().execute(new TestTask(), "testArg"); + + // Let metrics update twice. + final CountDownLatch latch = new CountDownLatch(2); + + ignite.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + assert evt.type() == EVT_NODE_METRICS_UPDATED; + + latch.countDown(); + + return true; + } + }, EVT_NODE_METRICS_UPDATED); + + latch.await(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + ClusterNodeMetrics metrics = ignite.cluster().localNode().metrics(); + + return metrics.getTotalExecutedJobs() == 5; + } + }, 5000); + + ClusterNodeMetrics metrics = ignite.cluster().localNode().metrics(); + + info("Node metrics: " + metrics); + + assertEquals(5, metrics.getTotalExecutedJobs()); + assertEquals(0, metrics.getTotalCancelledJobs()); + assertEquals(0, metrics.getTotalRejectedJobs()); + } + + /** + * Test task. + */ + private static class TestTask extends ComputeTaskSplitAdapter<Object, Object> { + /** Logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public Collection<? extends ComputeJob> split(int gridSize, Object arg) { + Collection<ComputeJob> refs = new ArrayList<>(gridSize*5); + + for (int i = 0; i < gridSize * 5; i++) + refs.add(new GridTestJob(arg.toString() + i + 1)); + + return refs; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return results; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java new file mode 100644 index 0000000..96c6efa --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java @@ -0,0 +1,768 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Abstract test for {@link org.apache.ignite.cluster.ClusterGroup} + */ +@SuppressWarnings("deprecation") +public abstract class GridProjectionAbstractTest extends GridCommonAbstractTest implements Externalizable { + /** Waiting timeout. */ + private static final int WAIT_TIMEOUT = 30000; + + /** Utility static variable. */ + private static final AtomicInteger cnt = new AtomicInteger(0); + + /** Mutex. */ + private static final Object mux = new Object(); + + /** Projection. */ + private ClusterGroup prj; + + /** Runnable job. */ + private Runnable runJob = new TestRunnable(); + + /** Callable job. */ + private Callable<String> calJob = new TestCallable<>(); + + /** Closure job. */ + private IgniteClosure<String, String> clrJob = new IgniteClosure<String, String>() { + @Override public String apply(String s) { + return s; + } + + @Override public String toString() { + return "clrJob"; + } + }; + + /** Reducer. */ + private IgniteReducer<String, Object> rdc = new IgniteReducer<String, Object>() { + @Override public boolean collect(String e) { + return true; + } + + @Nullable @Override public Object reduce() { + return null; + } + + @Override public String toString() { + return "rdc"; + } + }; + + /** */ + protected GridProjectionAbstractTest() { + // No-op. + } + + /** + * @param startGrid Start grid flag. + */ + protected GridProjectionAbstractTest(boolean startGrid) { + super(startGrid); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + prj = projection(); + + cnt.set(0); + } + + /** + * @return Projection. + */ + protected abstract ClusterGroup projection(); + + /** + * @return Local node ID. + */ + @Nullable protected abstract UUID localNodeId(); + + /** + * @return Remote nodes IDs. + */ + protected Collection<UUID> remoteNodeIds() { + return F.nodeIds(projection().forRemotes().nodes()); + } + + /** + * @return Projection size. + */ + private int projectionSize() { + int size = localNodeId() != null ? 1 : 0; + + size += remoteNodeIds().size(); + + assert size > 0; + + return size; + } + + /** + * @return Collection of projection node IDs. + */ + private Collection<UUID> projectionNodeIds() { + Collection<UUID> ids = new LinkedList<>(); + + UUID id = localNodeId(); + + if (id != null) + ids.add(id); + + ids.addAll(remoteNodeIds()); + + assert !ids.isEmpty(); + + return ids; + } + + /** + * Test for projection on not existing node IDs. + */ + public void testInvalidProjection() { + Collection<UUID> ids = new HashSet<>(); + + ids.add(UUID.randomUUID()); + ids.add(UUID.randomUUID()); + + ClusterGroup invalidPrj = prj.forNodeIds(ids); + + assertEquals(0, invalidPrj.nodes().size()); + } + + /** + * @throws Exception If test failed. + */ + public void testProjection() throws Exception { + assert prj != null; + + assert prj.ignite() != null; + assert prj.predicate() != null; + + int size = projectionSize(); + + assert prj.nodes().size() == size; + + Collection<UUID> nodeIds = projectionNodeIds(); + + for (ClusterNode node : prj.nodes()) + assert nodeIds.contains(node.id()); + } + + /** + * @throws Exception If test failed. + */ + public void testRemoteNodes() throws Exception { + Collection<UUID> remoteNodeIds = remoteNodeIds(); + + UUID locNodeId = localNodeId(); + + int size = remoteNodeIds.size(); + + String name = "oneMoreGrid"; + + try { + Ignite g = startGrid(name); + + UUID excludedId = g.cluster().localNode().id(); + + assertEquals(size, prj.forRemotes().nodes().size()); + + for (ClusterNode node : prj.forRemotes().nodes()) { + UUID id = node.id(); + + assert !id.equals(locNodeId) && remoteNodeIds.contains(id) && !excludedId.equals(id); + } + } + finally { + stopGrid(name); + } + } + + /** + * @throws Exception If test failed. + */ + public void testRemoteProjection() throws Exception { + Collection<UUID> remoteNodeIds = remoteNodeIds(); + + ClusterGroup remotePrj = projection().forRemotes(); + + Collection<UUID> prjNodeIds = F.nodeIds(remotePrj.nodes()); + + assert prjNodeIds.size() == remoteNodeIds.size(); + + assert prjNodeIds.containsAll(remoteNodeIds()); + + assert !prjNodeIds.contains(localNodeId()); + + String name = "oneMoreGrid"; + + try { + Ignite g = startGrid(name); + + UUID excludedId = g.cluster().localNode().id(); + + assert !F.nodeIds(remotePrj.nodes()).contains(excludedId); + } + finally { + stopGrid(name); + } + } + + /** + * @throws Exception If test failed. + */ + public void testExecution() throws Exception { + String name = "oneMoreGrid"; + + Collection<IgniteBiTuple<Ignite, IgnitePredicate<IgniteEvent>>> lsnrs = new LinkedList<>(); + + try { + final AtomicInteger cnt = new AtomicInteger(); + + Ignite g = startGrid(name); + + IgnitePredicate<IgniteEvent> lsnr; + + if (!Ignite.class.isAssignableFrom(projection().getClass())) { + g.events().localListen(lsnr = new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + assert evt.type() == EVT_JOB_STARTED; + + assert false; + + return true; + } + }, EVT_JOB_STARTED); + + lsnrs.add(F.t(g, lsnr)); + } + + for (ClusterNode node : prj.nodes()) { + g = G.ignite(node.id()); + + g.events().localListen(lsnr = new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + assert evt.type() == EVT_JOB_STARTED; + + synchronized (mux) { + cnt.incrementAndGet(); + + mux.notifyAll(); + } + + return true; + } + }, EVT_JOB_STARTED); + + lsnrs.add(F.t(g, lsnr)); + } + + run1(cnt); + run2(cnt); + + call1(cnt); + call2(cnt); + call3(cnt); + call4(cnt); + call5(cnt); + + forkjoin1(cnt); + forkjoin2(cnt); + + exec1(cnt); + exec2(cnt); + + executorService(cnt); + + checkActiveFutures(); + } + finally { + for (IgniteBiTuple<Ignite, IgnitePredicate<IgniteEvent>> t : lsnrs) + t.get1().events().stopLocalListen(t.get2(), EVT_JOB_STARTED); + + stopGrid(name); + } + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void run1(AtomicInteger cnt) throws Exception { + IgniteCompute comp = compute(prj).enableAsync(); + + comp.broadcast(runJob); + + IgniteFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).broadcast(runJob); + + waitForValue(cnt, projectionSize()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void run2(AtomicInteger cnt) throws Exception { + Collection<Runnable> jobs = F.asList(runJob); + + IgniteCompute comp = compute(prj).enableAsync(); + + comp.run(jobs); + + IgniteFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).run(jobs); + + waitForValue(cnt, jobs.size()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void call1(AtomicInteger cnt) throws Exception { + IgniteCompute comp = compute(prj).enableAsync(); + + comp.broadcast(calJob); + + IgniteFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).broadcast(calJob); + + waitForValue(cnt, projectionSize()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void call2(AtomicInteger cnt) throws Exception { + IgniteCompute comp = compute(prj).enableAsync(); + + Collection<Callable<String>> jobs = F.asList(calJob); + + comp.call(jobs); + + IgniteFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).call(jobs); + + waitForValue(cnt, jobs.size()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void call3(AtomicInteger cnt) throws Exception { + IgniteCompute comp = compute(prj).enableAsync(); + + comp.apply(clrJob, (String) null); + + IgniteFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).apply(clrJob, (String) null); + + waitForValue(cnt, 1); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void call4(AtomicInteger cnt) throws Exception { + Collection<String> args = F.asList("a", "b", "c"); + + IgniteCompute comp = compute(prj).enableAsync(); + + comp.apply(clrJob, args); + + IgniteFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).apply(clrJob, args); + + waitForValue(cnt, args.size()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void call5(AtomicInteger cnt) throws Exception { + IgniteCompute comp = compute(prj).enableAsync(); + + comp.broadcast(new TestClosure(), "arg"); + + IgniteFuture<Collection<String>> fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + Collection<String> res = compute(prj).broadcast(new TestClosure(), "arg"); + + assertEquals(projectionSize(), res.size()); + + waitForValue(cnt, projectionSize()); + + for (String resStr : res) + assertEquals("arg", resStr); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void forkjoin1(AtomicInteger cnt) throws Exception { + Collection<String> args = F.asList("a", "b", "c"); + + IgniteCompute comp = compute(prj).enableAsync(); + + comp.apply(clrJob, args, rdc); + + IgniteFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).apply(clrJob, args, rdc); + + waitForValue(cnt, args.size()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void forkjoin2(AtomicInteger cnt) throws Exception { + Collection<Callable<String>> jobs = F.asList(calJob); + + IgniteCompute comp = compute(prj).enableAsync(); + + comp.call(jobs, rdc); + + IgniteFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).call(jobs, rdc); + + waitForValue(cnt, jobs.size()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void exec1(AtomicInteger cnt) throws Exception { + cnt.set(0); + + compute(prj).execute(TestTask.class.getName(), null); + + waitForValue(cnt, projectionSize()); + + cnt.set(0); + + compute(prj).execute(new TestTask(), null); + + waitForValue(cnt, projectionSize()); + + cnt.set(0); + + compute(prj).execute(TestTask.class, null); + + waitForValue(cnt, projectionSize()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void exec2(AtomicInteger cnt) throws Exception { + cnt.set(0); + + compute(prj).withTimeout(WAIT_TIMEOUT).execute(TestTask.class.getName(), null); + + waitForValue(cnt, projectionSize()); + + cnt.set(0); + + compute(prj).withTimeout(WAIT_TIMEOUT).execute(new TestTask(), null); + + waitForValue(cnt, projectionSize()); + + cnt.set(0); + + compute(prj).withTimeout(WAIT_TIMEOUT).execute(TestTask.class, null); + + waitForValue(cnt, projectionSize()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void executorService(AtomicInteger cnt) throws Exception { + cnt.set(0); + + ExecutorService execSrvc = prj.ignite().executorService(prj); + + Future<String> fut = execSrvc.submit(new TestCallable<String>() { + @Override public String call() throws Exception { + return "submit1"; + } + }); + + waitForValue(cnt, 1); + + assertEquals("submit1", fut.get()); + + cnt.set(0); + + fut = execSrvc.submit(new TestRunnable(), "submit2"); + + waitForValue(cnt, 1); + + assertEquals("submit2", fut.get()); + + cnt.set(0); + + Future<?> runFut = execSrvc.submit(new TestRunnable()); + + waitForValue(cnt, 1); + + runFut.get(); + } + + /** + * @param fut Execution future. + * @throws InterruptedException Thrown if wait was interrupted. + */ + @SuppressWarnings({"UnconditionalWait"}) + private void waitForExecution(IgniteFuture fut) throws InterruptedException { + long sleep = 250; + + long threshold = System.currentTimeMillis() + WAIT_TIMEOUT; + + do synchronized (mux) { + mux.wait(sleep); + } + while (fut != null && !fut.isDone() && !fut.isCancelled() && threshold > System.currentTimeMillis()); + + assert fut == null || fut.isDone(); + } + + /** + * @param cnt Counter to check. + * @param val Value to check. + * @throws InterruptedException Thrown if wait was interrupted. + */ + private void waitForValue(AtomicInteger cnt, int val) throws InterruptedException { + assert cnt != null; + assert val > 0; + + long threshold = System.currentTimeMillis() + WAIT_TIMEOUT; + + long time; + + while (threshold > (time = System.currentTimeMillis())) + synchronized (mux) { + if (cnt.get() == val) + break; + + mux.wait(threshold - time); + } + + assert cnt.get() == val; + } + + /** + * @throws Exception If test failed. + */ + private void checkActiveFutures() throws Exception { + IgniteCompute comp = compute(prj).enableAsync(); + + assertEquals(0, comp.activeTaskFutures().size()); + + cnt.set(0); + + Collection<ComputeTaskFuture<Object>> futsList = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + comp.call(new TestWaitCallable<Object>()); + + ComputeTaskFuture<Object> fut = comp.future(); + + assertFalse(fut.isDone()); + + Map<IgniteUuid, ComputeTaskFuture<Object>> futs = comp.activeTaskFutures(); + + assertEquals(i + 1, futs.size()); + + assertTrue(futs.containsKey(fut.getTaskSession().getId())); + + futsList.add(fut); + } + + synchronized (mux) { + cnt.incrementAndGet(); + + mux.notifyAll(); + } + + for (ComputeTaskFuture<Object> fut : futsList) + fut.get(); + + assertEquals(0, comp.activeTaskFutures().size()); + } + + /** + * Test closure. + */ + private static class TestClosure implements IgniteClosure<String, String> { + /** {@inheritDoc} */ + @Override public String apply(String s) { + return s; + } + } + + /** + * Test runnable. + */ + private static class TestRunnable implements Runnable, Serializable { + /** {@inheritDoc} */ + @Override public void run() { + // No-op. + } + } + + /** + * Test callable. + */ + private static class TestCallable<T> implements Callable<T>, Serializable { + /** {@inheritDoc} */ + @Nullable @Override public T call() throws Exception { + return null; + } + } + + /** + * Test callable. + */ + private static class TestWaitCallable<T> implements Callable<T>, Serializable { + /** {@inheritDoc} */ + @Nullable @Override public T call() throws Exception { + synchronized (mux) { + while (cnt.get() == 0) + mux.wait(); + } + + return null; + } + } + + /** + * Test task. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static class TestTask extends ComputeTaskSplitAdapter<String, Void> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteCheckedException { + Collection<ComputeJob> jobs = new HashSet<>(); + + for (int i = 0; i < gridSize; i++) + jobs.add(new TestJob()); + + return jobs; + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } + + /** + * Test job. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static class TestJob extends ComputeJobAdapter { + /** {@inheritDoc} */ + @Nullable @Override public Object execute() throws IgniteCheckedException { + return null; + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java new file mode 100644 index 0000000..c3d4302 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Tests for {@link org.apache.ignite.cluster.ClusterGroup#forCache(String, String...)} method. + */ +public class GridProjectionForCachesSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String CACHE_NAME = "cache"; + + /** */ + private Ignite ignite; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setDiscoverySpi(discoverySpi()); + + if (gridName.equals(getTestGridName(0))) + cfg.setCacheConfiguration(cacheConfiguration(null)); + else if (gridName.equals(getTestGridName(1))) + cfg.setCacheConfiguration(cacheConfiguration(CACHE_NAME)); + else if (gridName.equals(getTestGridName(2)) || gridName.equals(getTestGridName(3))) + cfg.setCacheConfiguration(cacheConfiguration(null), cacheConfiguration(CACHE_NAME)); + else + cfg.setCacheConfiguration(); + + return cfg; + } + + /** + * @return Discovery SPI; + */ + private DiscoverySpi discoverySpi() { + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + return spi; + } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(@Nullable String cacheName) { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setName(cacheName); + cfg.setCacheMode(PARTITIONED); + cfg.setBackups(1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + for (int i = 0; i < 5; i++) + startGrid(i); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ignite = grid(0); + } + + /** + * @throws Exception If failed. + */ + public void testProjectionForDefaultCache() throws Exception { + ClusterGroup prj = ignite.cluster().forCache(null); + + assert prj != null; + assert prj.nodes().size() == 3; + assert prj.nodes().contains(grid(0).localNode()); + assert !prj.nodes().contains(grid(1).localNode()); + assert prj.nodes().contains(grid(2).localNode()); + assert prj.nodes().contains(grid(3).localNode()); + assert !prj.nodes().contains(grid(4).localNode()); + } + + /** + * @throws Exception If failed. + */ + public void testProjectionForNamedCache() throws Exception { + ClusterGroup prj = ignite.cluster().forCache(CACHE_NAME); + + assert prj != null; + assert prj.nodes().size() == 3; + assert !prj.nodes().contains(grid(0).localNode()); + assert prj.nodes().contains(grid(1).localNode()); + assert prj.nodes().contains(grid(2).localNode()); + assert prj.nodes().contains(grid(3).localNode()); + assert !prj.nodes().contains(grid(4).localNode()); + } + + /** + * @throws Exception If failed. + */ + public void testProjectionForBothCaches() throws Exception { + ClusterGroup prj = ignite.cluster().forCache(null, CACHE_NAME); + + assert prj != null; + assert prj.nodes().size() == 2; + assert !prj.nodes().contains(grid(0).localNode()); + assert !prj.nodes().contains(grid(1).localNode()); + assert prj.nodes().contains(grid(2).localNode()); + assert prj.nodes().contains(grid(3).localNode()); + assert !prj.nodes().contains(grid(4).localNode()); + } + + /** + * @throws Exception If failed. + */ + public void testProjectionForWrongCacheName() throws Exception { + ClusterGroup prj = ignite.cluster().forCache("wrong"); + + assert prj != null; + assert prj.nodes().isEmpty(); + } + + /** + * @throws Exception If failed. + */ + public void testProjections() throws Exception { + ClusterNode locNode = ignite.cluster().localNode(); + UUID locId = locNode.id(); + + assertNotNull(locId); + + assertEquals(5, ignite.cluster().nodes().size()); + + ClusterGroup prj = ignite.cluster().forLocal(); + + assertEquals(1, prj.nodes().size()); + assertEquals(locNode, F.first(prj.nodes())); + + prj = ignite.cluster().forHost(locNode); + assertEquals(ignite.cluster().nodes().size(), prj.nodes().size()); + assertTrue(ignite.cluster().nodes().containsAll(prj.nodes())); + try { + ignite.cluster().forHost(null); + } + catch (NullPointerException ignored) { + // No-op. + } + + prj = ignite.cluster().forNode(locNode); + assertEquals(1, prj.nodes().size()); + + prj = ignite.cluster().forNode(locNode, locNode); + assertEquals(1, prj.nodes().size()); + + try { + ignite.cluster().forNode(null); + } + catch (NullPointerException ignored) { + // No-op. + } + + prj = ignite.cluster().forNodes(F.asList(locNode)); + assertEquals(1, prj.nodes().size()); + + prj = ignite.cluster().forNodes(F.asList(locNode, locNode)); + assertEquals(1, prj.nodes().size()); + + try { + ignite.cluster().forNodes(null); + } + catch (NullPointerException ignored) { + // No-op. + } + + prj = ignite.cluster().forNodeId(locId); + assertEquals(1, prj.nodes().size()); + + prj = ignite.cluster().forNodeId(locId, locId); + assertEquals(1, prj.nodes().size()); + + try { + ignite.cluster().forNodeId(null); + } + catch (NullPointerException ignored) { + // No-op. + } + + prj = ignite.cluster().forNodeIds(F.asList(locId)); + assertEquals(1, prj.nodes().size()); + + prj = ignite.cluster().forNodeIds(F.asList(locId, locId)); + assertEquals(1, prj.nodes().size()); + + try { + ignite.cluster().forNodeIds(null); + } + catch (NullPointerException ignored) { + // No-op. + } + + prj = ignite.cluster().forOthers(locNode); + + assertEquals(4, prj.nodes().size()); + assertFalse(prj.nodes().contains(locNode)); + + assertEquals(4, ignite.cluster().forRemotes().nodes().size()); + assertTrue(prj.nodes().containsAll(ignite.cluster().forRemotes().nodes())); + + try { + ignite.cluster().forOthers((ClusterNode)null); + } + catch (NullPointerException ignored) { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java new file mode 100644 index 0000000..5a8740a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Tests for methods that run job locally with multiple arguments. + */ +public class GridProjectionLocalJobMultipleArgumentsSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static Collection<Integer> ids; + + /** */ + private static AtomicInteger res; + + /** + * Starts grid. + */ + public GridProjectionLocalJobMultipleArgumentsSelfTest() { + super(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + CacheConfiguration cache = defaultCacheConfiguration(); + + cache.setCacheMode(PARTITIONED); + cache.setBackups(1); + + cfg.setCacheConfiguration(cache); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ids = new GridConcurrentHashSet<>(); + res = new AtomicInteger(); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityCall() throws Exception { + Collection<Integer> res = new ArrayList<>(); + + for (int i : F.asList(1, 2, 3)) { + res.add(grid().compute().affinityCall(null, i, new IgniteCallable<Integer>() { + @Override public Integer call() { + ids.add(System.identityHashCode(this)); + + return 10; + } + })); + } + + assertEquals(30, F.sumInt(res)); + assertEquals(3, ids.size()); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityRun() throws Exception { + for (int i : F.asList(1, 2, 3)) { + grid().compute().affinityRun(null, i, new IgniteRunnable() { + @Override public void run() { + ids.add(System.identityHashCode(this)); + + res.addAndGet(10); + } + }); + } + + assertEquals(30, res.get()); + assertEquals(3, ids.size()); + } + + /** + * @throws Exception If failed. + */ + public void testCall() throws Exception { + Collection<Integer> res = grid().compute().apply(new C1<Integer, Integer>() { + @Override public Integer apply(Integer arg) { + + ids.add(System.identityHashCode(this)); + + return 10 + arg; + } + }, F.asList(1, 2, 3)); + + assertEquals(36, F.sumInt(res)); + assertEquals(3, ids.size()); + } + + /** + * @throws Exception If failed. + */ + public void testCallWithProducer() throws Exception { + Collection<Integer> args = Arrays.asList(1, 2, 3); + + Collection<Integer> res = grid().compute().apply(new C1<Integer, Integer>() { + @Override public Integer apply(Integer arg) { + ids.add(System.identityHashCode(this)); + + return 10 + arg; + } + }, args); + + assertEquals(36, F.sumInt(res)); + assertEquals(3, ids.size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java new file mode 100644 index 0000000..ca11187 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * Test for {@link org.apache.ignite.cluster.ClusterGroup}. + */ +@GridCommonTest(group = "Kernal Self") +public class GridProjectionSelfTest extends GridProjectionAbstractTest { + /** Nodes count. */ + private static final int NODES_CNT = 4; + + /** Projection node IDs. */ + private static Collection<UUID> ids; + + /** */ + private static Ignite ignite; + + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions"}) + @Override protected void beforeTestsStarted() throws Exception { + assert NODES_CNT > 2; + + ids = new LinkedList<>(); + + for (int i = 0; i < NODES_CNT; i++) { + Ignite g = startGrid(i); + + ids.add(g.cluster().localNode().id()); + + if (i == 0) + ignite = g; + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + for (int i = 0; i < NODES_CNT; i++) + stopGrid(i); + } + + /** {@inheritDoc} */ + @Override protected ClusterGroup projection() { + return grid(0).forPredicate(F.<ClusterNode>nodeForNodeIds(ids)); + } + + /** {@inheritDoc} */ + @Override protected UUID localNodeId() { + return grid(0).localNode().id(); + } + + /** + * @throws Exception If failed. + */ + public void testRandom() throws Exception { + assertTrue(ignite.cluster().nodes().contains(ignite.cluster().forRandom().node())); + } + + /** + * @throws Exception If failed. + */ + public void testOldest() throws Exception { + ClusterGroup oldest = ignite.cluster().forOldest(); + + ClusterNode node = null; + + long minOrder = Long.MAX_VALUE; + + for (ClusterNode n : ignite.cluster().nodes()) { + if (n.order() < minOrder) { + node = n; + + minOrder = n.order(); + } + } + + assertEquals(oldest.node(), ignite.cluster().forNode(node).node()); + } + + /** + * @throws Exception If failed. + */ + public void testYoungest() throws Exception { + ClusterGroup youngest = ignite.cluster().forYoungest(); + + ClusterNode node = null; + + long maxOrder = Long.MIN_VALUE; + + for (ClusterNode n : ignite.cluster().nodes()) { + if (n.order() > maxOrder) { + node = n; + + maxOrder = n.order(); + } + } + + assertEquals(youngest.node(), ignite.cluster().forNode(node).node()); + } + + /** + * @throws Exception If failed. + */ + public void testNewNodes() throws Exception { + ClusterGroup youngest = ignite.cluster().forYoungest(); + ClusterGroup oldest = ignite.cluster().forOldest(); + + ClusterNode old = oldest.node(); + ClusterNode last = youngest.node(); + + assertNotNull(last); + + try (Ignite g = startGrid(NODES_CNT)) { + ClusterNode n = g.cluster().localNode(); + + ClusterNode latest = youngest.node(); + + assertNotNull(latest); + assertEquals(latest.id(), n.id()); + assertEquals(oldest.node(), old); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridReduceSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridReduceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridReduceSelfTest.java new file mode 100644 index 0000000..f83b7e8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridReduceSelfTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * Test reduce with long operations. + */ +public class GridReduceSelfTest extends GridCommonAbstractTest { + /** Number of nodes in the grid. */ + private static final int GRID_CNT = 3; + + /** + * @throws Exception If failed. + */ + public void testReduce() throws Exception { + startGrids(GRID_CNT); + + try { + Ignite ignite = grid(0); + + assert ignite.cluster().nodes().size() == GRID_CNT; + + List<ReducerTestClosure> closures = closures(ignite.cluster().nodes().size()); + + Long res = compute(ignite.cluster().forLocal()).call(closures, new R1<Long, Long>() { + private long sum; + + @Override public boolean collect(Long e) { + info("Got result from closure: " + e); + + sum += e; + + // Stop collecting on value 1. + return e != 1; + } + + @Override public Long reduce() { + return sum; + } + }); + + assertEquals((Long)1L, res); + + assertTrue(closures.get(0).isFinished); + + for (int i = 1; i < closures.size(); i++) + assertFalse("Closure #" + i + " is not interrupted.", closures.get(i).isFinished); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReduceAsync() throws Exception { + startGrids(GRID_CNT); + + try { + Ignite ignite = grid(0); + + assert ignite.cluster().nodes().size() == GRID_CNT; + + List<ReducerTestClosure> closures = closures(ignite.cluster().nodes().size()); + + IgniteCompute comp = compute(ignite.cluster().forLocal()).enableAsync(); + + comp.call(closures, new R1<Long, Long>() { + private long sum; + + @Override + public boolean collect(Long e) { + info("Got result from closure: " + e); + + sum += e; + + // Stop collecting on value 1. + return e != 1; + } + + @Override + public Long reduce() { + return sum; + } + }); + + IgniteFuture<Long> fut = comp.future(); + + assertEquals((Long)1L, fut.get()); + + assertTrue(closures.get(0).isFinished); + + for (int i = 1; i < closures.size(); i++) + assertFalse("Closure #" + i + " is not interrupted.", closures.get(i).isFinished); + } + finally { + stopAllGrids(); + } + } + + /** + * @param size Number of closures. + * @return Collection of closures. + */ + private static List<ReducerTestClosure> closures(int size) { + assert size > 1; + + List<ReducerTestClosure> cls = new ArrayList<>(size); + + cls.add(new ReducerTestClosure(true)); // Fast closure. + + for (int i = 1; i < size; i++) + cls.add(new ReducerTestClosure(false)); // Normal closures. + + return cls; + } + + /** + * Closure for testing reducer. + */ + @SuppressWarnings("PackageVisibleField") + private static class ReducerTestClosure implements IgniteCallable<Long> { + /** Logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Test flag to check the thread was interrupted. */ + volatile boolean isFinished; + + /** Fast or normal closure. */ + private boolean fast; + + /** + * @param fast Fast or normal closure. + */ + ReducerTestClosure(boolean fast) { + this.fast = fast; + } + + /** {@inheritDoc} */ + @Override public Long call() { + try { + try { + if (fast) { + Thread.sleep(500); + + log.info("Returning 1 from fast closure."); + + return 1L; + } + else { + Thread.sleep(5000); + + log.info("Returning 2 from normal closure."); + + return 2L; + } + } + finally { + isFinished = true; + } + } + catch (InterruptedException ignore) { + log.info("Returning 0 from interrupted closure."); + + return 0L; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridReleaseTypeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridReleaseTypeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridReleaseTypeSelfTest.java new file mode 100644 index 0000000..f58992f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridReleaseTypeSelfTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.product.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Test grids starting with non compatible release types. + */ +public class GridReleaseTypeSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Counter. */ + private static final AtomicInteger cnt = new AtomicInteger(); + + /** */ + private String firstNodeVer; + + /** */ + private String secondNodeVer; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + final int idx = cnt.getAndIncrement(); + + // Override node attributes in discovery spi. + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi() { + @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { + super.setNodeAttributes(attrs, ver); + + if (idx % 2 == 0) + attrs.put(GridNodeAttributes.ATTR_BUILD_VER, firstNodeVer); + else + attrs.put(GridNodeAttributes.ATTR_BUILD_VER, secondNodeVer); + } + }; + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testNodeJoinTopologyWithDifferentReleaseType() throws Exception { + firstNodeVer = "1.0.0-ent"; + secondNodeVer = "1.0.0-os"; + + try { + startGrids(2); + } + catch (IgniteCheckedException e) { + StringWriter errors = new StringWriter(); + + e.printStackTrace(new PrintWriter(errors)); + + String stackTrace = errors.toString(); + + assertTrue( + "Caught exception does not contain specified string.", + stackTrace.contains("Topology cannot contain nodes of both enterprise and open source") + ); + + return; + } + finally { + stopAllGrids(); + } + + fail("Exception has not been thrown."); + } + + /** + * @throws Exception If failed. + */ + public void testOsEditionDoesNotSupportRollingUpdates() throws Exception { + firstNodeVer = "1.0.0-os"; + secondNodeVer = "1.0.1-os"; + + try { + startGrids(2); + } + catch (IgniteCheckedException e) { + StringWriter errors = new StringWriter(); + + e.printStackTrace(new PrintWriter(errors)); + + String stackTrace = errors.toString(); + + assertTrue( + "Caught exception does not contain specified string.", + stackTrace.contains("Local node and remote node have different version numbers") + ); + + return; + } + finally { + stopAllGrids(); + } + + fail("Exception has not been thrown."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridRuntimeExceptionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridRuntimeExceptionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridRuntimeExceptionSelfTest.java new file mode 100644 index 0000000..a3e21f8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridRuntimeExceptionSelfTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Tests runtime exception. + */ +@SuppressWarnings({"ProhibitedExceptionDeclared"}) +@GridCommonTest(group = "Kernal Self") +public class GridRuntimeExceptionSelfTest extends GridCommonAbstractTest { + /** */ + private enum FailType { + /** */ + MAP, + + /** */ + RESULT, + + /** */ + REDUCE, + + /** */ + EXECUTE + } + + /** */ + public GridRuntimeExceptionSelfTest() { + super(/*start grid*/false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(); + } + + /** + * @throws Exception If failed. + */ + public void testExecuteFailed() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + ignite.compute().localDeployTask(GridTaskFailedTestTask.class, GridTaskFailedTestTask.class.getClassLoader()); + + ComputeTaskFuture<?> fut = + executeAsync(ignite.compute(), GridTaskFailedTestTask.class.getName(), FailType.EXECUTE); + + try { + fut.get(); + + assert false; + } + catch (IgniteCheckedException e) { + info("Got expected grid exception: " + e); + } + + IgniteUuid sesId = fut.getTaskSession().getId(); + + // Query for correct events. + List<IgniteEvent> evts = ignite.events().remoteQuery(new TaskFailedEventFilter(sesId), 0); + + info("Job failed event: " + evts.get(0)); + + assert evts.size() == 1; + } + + /** + * @throws Exception If failed. + */ + public void testMapFailed() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + ignite.compute().localDeployTask(GridTaskFailedTestTask.class, GridTaskFailedTestTask.class.getClassLoader()); + + ComputeTaskFuture<?> fut = + executeAsync(ignite.compute(), GridTaskFailedTestTask.class.getName(), FailType.MAP); + + try { + fut.get(); + + assert false; + } + catch (IgniteCheckedException e) { + info("Got expected grid exception: " + e); + } + + IgniteUuid sesId = fut.getTaskSession().getId(); + + // Query for correct events. + List<IgniteEvent> evts = ignite.events().remoteQuery(new TaskFailedEventFilter(sesId), 0); + + assert evts.size() == 1; + + info("Task failed event: " + evts.get(0)); + } + + /** + * @throws Exception If failed. + */ + public void testResultFailed() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + ignite.compute().localDeployTask(GridTaskFailedTestTask.class, GridTaskFailedTestTask.class.getClassLoader()); + + ComputeTaskFuture<?> fut = + executeAsync(ignite.compute(), GridTaskFailedTestTask.class.getName(), FailType.RESULT); + + try { + fut.get(); + + assert false; + } + catch (IgniteCheckedException e) { + info("Got expected grid exception: " + e); + } + + IgniteUuid sesId = fut.getTaskSession().getId(); + + // Query for correct events. + List<IgniteEvent> evts = ignite.events().remoteQuery(new TaskFailedEventFilter(sesId), 0); + + assert evts.size() == 1; + + info("Task failed event: " + evts.get(0)); + } + + /** + * @throws Exception If failed. + */ + public void testReduceFailed() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + ignite.compute().localDeployTask(GridTaskFailedTestTask.class, GridTaskFailedTestTask.class.getClassLoader()); + + ComputeTaskFuture<?> fut = + executeAsync(ignite.compute(), GridTaskFailedTestTask.class.getName(), FailType.RESULT); + + try { + fut.get(); + + assert false; + } + catch (IgniteCheckedException e) { + info("Got expected grid exception: " + e); + } + + IgniteUuid sesId = fut.getTaskSession().getId(); + + // Query for correct events. + List<IgniteEvent> evts = ignite.events().remoteQuery(new TaskFailedEventFilter(sesId), 0); + + assert evts.size() == 1; + + info("Task failed event: " + evts.get(0)); + } + + /** */ + private static class TaskFailedEventFilter implements IgnitePredicate<IgniteEvent> { + /** */ + private IgniteUuid sesId; + + /** + * @param sesId Session ID. + */ + TaskFailedEventFilter(IgniteUuid sesId) { + this.sesId = sesId; + } + + /** {@inheritDoc} */ + @Override public boolean apply(IgniteEvent evt) { + return evt instanceof IgniteTaskEvent && + ((IgniteTaskEvent)evt).taskSessionId() != null && + ((IgniteTaskEvent)evt).taskSessionId().equals(sesId) && + evt.type() == EVT_TASK_FAILED; + } + } + + /** */ + private static class GridTaskFailedTestTask extends ComputeTaskAdapter<Serializable, Serializable> { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private FailType failType; + + /** {@inheritDoc} */ + @SuppressWarnings({"ProhibitedExceptionThrown"}) + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Serializable arg) + throws IgniteCheckedException { + if (log.isInfoEnabled()) + log.info("Mapping job [job=" + this + ", grid=" + subgrid + ", arg=" + arg + ']'); + + failType = (FailType)arg; + + if (failType == FailType.MAP) + throw new RuntimeException("Failed out of map method."); + + Map<ComputeJob, ClusterNode> map = new HashMap<>(2); + + assert subgrid.size() == 1; + assert subgrid.get(0).id().equals(ignite.configuration().getNodeId()); + + map.put(new GridTaskFailedTestJob(null), subgrid.get(0)); + map.put(new GridTaskFailedTestJob(failType), subgrid.get(0)); + + return map; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"ProhibitedExceptionThrown"}) + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received) throws IgniteCheckedException { + if (failType == FailType.RESULT) + throw new RuntimeException("Failing out of result method."); + + if (res.getException() != null) + throw res.getException(); + + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"ProhibitedExceptionThrown"}) + @Override public Serializable reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results != null; + + if (failType == FailType.REDUCE) + throw new RuntimeException("Failed out of reduce method."); + + return (Serializable)results; + } + } + + /** */ + private static class GridTaskFailedTestJob extends ComputeJobAdapter { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + GridTaskFailedTestJob() { + // No-op. + } + + /** + * @param arg Job argument. + */ + GridTaskFailedTestJob(FailType arg) { + super(arg); + } + + /** {@inheritDoc} */ + @Override public Serializable execute() { + if (log.isInfoEnabled()) + log.info("Executing job [job=" + this + ", arg=" + argument(0) + ']'); + + if (argument(0) != null && argument(0) == FailType.EXECUTE) { + // Throw exception. + throw new RuntimeException("GridTaskFailedTestJob expected exception."); + } + + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java new file mode 100644 index 0000000..dc19476 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Starts two grids on the same vm, checks topologies of each grid and discovery + * events while stopping one them. + */ +@GridCommonTest(group = "Kernal Self") +public class GridSameVmStartupSelfTest extends GridCommonAbstractTest { + /** + * + */ + public GridSameVmStartupSelfTest() { + super(false); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testSameVmStartup() throws Exception { + Ignite ignite1 = startGrid(1); + + Collection<ClusterNode> top1 = ignite1.cluster().forRemotes().nodes(); + + try { + assert top1.isEmpty() : "Grid1 topology is not empty: " + top1; + + // Start another grid. + Ignite ignite2 = startGrid(2); + + final CountDownLatch latch = new CountDownLatch(1); + + int size1 = ignite1.cluster().forRemotes().nodes().size(); + int size2 = ignite2.cluster().forRemotes().nodes().size(); + + assert size1 == 1 : "Invalid number of remote nodes discovered: " + size1; + assert size2 == 1 : "Invalid number of remote nodes discovered: " + size2; + + final UUID grid1LocNodeId = ignite1.cluster().localNode().id(); + + ignite2.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + assert evt.type() != EVT_NODE_FAILED : + "Node1 did not exit gracefully."; + + if (evt instanceof IgniteDiscoveryEvent) { + // Local node can send METRICS_UPDATED event. + assert ((IgniteDiscoveryEvent) evt).eventNode().id().equals(grid1LocNodeId) || + evt.type() == EVT_NODE_METRICS_UPDATED : + "Received event about invalid node [received=" + + ((IgniteDiscoveryEvent) evt).eventNode().id() + ", expected=" + grid1LocNodeId + + ", type=" + evt.type() + ']'; + + if (evt.type() == EVT_NODE_LEFT) + latch.countDown(); + } + + return true; + } + }, EVTS_DISCOVERY); + + stopGrid(1); + + latch.await(); + + Collection<ClusterNode> top2 = ignite2.cluster().forRemotes().nodes(); + + assert top2.isEmpty() : "Grid2 topology is not empty: " + top2; + } + finally { + stopGrid(1); + stopGrid(2); + } + + assert G.allGrids().isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java new file mode 100644 index 0000000..d7c8cb1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.messaging.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Test for {@link org.apache.ignite.Ignite}. + */ +@GridCommonTest(group = "Kernal Self") +public class GridSelfTest extends GridProjectionAbstractTest { + /** Nodes count. */ + private static final int NODES_CNT = 4; + + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions"}) + @Override protected void beforeTestsStarted() throws Exception { + assert NODES_CNT > 2; + + for (int i = 0; i < NODES_CNT; i++) + startGrid(i); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected ClusterGroup projection() { + return grid(0); + } + + /** {@inheritDoc} */ + @Override protected UUID localNodeId() { + return grid(0).localNode().id(); + } + + /** {@inheritDoc} */ + @Override protected Collection<UUID> remoteNodeIds() { + return F.nodeIds(grid(0).forRemotes().nodes()); + } + + /** {@inheritDoc} */ + @Override public void testRemoteNodes() throws Exception { + int size = remoteNodeIds().size(); + + String name = "oneMoreGrid"; + + try { + Ignite g = startGrid(name); + + UUID joinedId = g.cluster().localNode().id(); + + assert projection().forRemotes().nodes().size() == size + 1; + + assert F.nodeIds(projection().forRemotes().nodes()).contains(joinedId); + } + finally { + stopGrid(name); + } + } + + /** {@inheritDoc} */ + @Override public void testRemoteProjection() throws Exception { + ClusterGroup remotePrj = projection().forRemotes(); + + int size = remotePrj.nodes().size(); + + String name = "oneMoreGrid"; + + try { + Ignite g = startGrid(name); + + UUID joinedId = g.cluster().localNode().id(); + + assert remotePrj.nodes().size() == size + 1; + + assert F.nodeIds(remotePrj.nodes()).contains(joinedId); + } + finally { + stopGrid(name); + } + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testAsyncListen() throws Exception { + final String msg = "HELLO!"; + + Ignite g = (Ignite)projection(); + + final UUID locNodeId = g.cluster().localNode().id(); + + g.message().remoteListen(null, new MessagingListenActor<String>() { + @Override protected void receive(UUID nodeId, String rcvMsg) throws Throwable { + assert locNodeId.equals(nodeId); + assert msg.equals(rcvMsg); + + stop(rcvMsg); + } + }); + + final AtomicInteger cnt = new AtomicInteger(); + + g.message().localListen(null, new P2<UUID, String>() { + @Override + public boolean apply(UUID nodeId, String msg) { + if (!locNodeId.equals(nodeId)) + cnt.incrementAndGet(); + + return true; + } + }); + + g.message().send(null, msg); + + Thread.sleep(1000); + + assert cnt.get() == g.cluster().forRemotes().nodes().size(); + } + + /** + * @throws Exception If failed. + */ + public void testForOthers() throws Exception { + ClusterNode node0 = grid(0).localNode(); + ClusterNode node1 = grid(1).localNode(); + ClusterNode node2 = grid(2).localNode(); + ClusterNode node3 = grid(3).localNode(); + + ClusterGroup p1 = grid(0).forOthers(node0); + + assertEquals(3, p1.nodes().size()); + + assertEquals(2, p1.forOthers(node1).nodes().size()); + + assertEquals(1, p1.forOthers(node1, node2).nodes().size()); + + assertEquals(1, grid(0).forOthers(node1, node2, node3).nodes().size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java new file mode 100644 index 0000000..5406939 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.deployment.*; +import org.apache.ignite.spi.eventstorage.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Tests exceptions that are thrown by event storage and deployment spi. + */ +@GridCommonTest(group = "Kernal Self") +public class GridSpiExceptionSelfTest extends GridCommonAbstractTest { + /** */ + private static final String TEST_MSG = "Test exception message"; + + /** */ + public GridSpiExceptionSelfTest() { + super(/*start Grid*/false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setEventStorageSpi(new GridTestRuntimeExceptionSpi()); + cfg.setDeploymentSpi(new GridTestCheckedExceptionSpi()); + + // Disable cache since it can deploy some classes during start process. + cfg.setCacheConfiguration(); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSpiFail() throws Exception { + Ignite ignite = startGrid(); + + try { + try { + ignite.events().localQuery(F.<IgniteEvent>alwaysTrue()); + + assert false : "Exception should be thrown"; + } + catch (IgniteException e) { + assert e.getMessage().startsWith(TEST_MSG) : "Wrong exception message." + e.getMessage(); + } + + try { + ignite.compute().localDeployTask(GridTestTask.class, GridTestTask.class.getClassLoader()); + + assert false : "Exception should be thrown"; + } + catch (IgniteCheckedException e) { + assert e.getCause() instanceof GridTestSpiException : "Wrong cause exception type. " + e; + + assert e.getCause().getMessage().startsWith(TEST_MSG) : "Wrong exception message." + e.getMessage(); + } + } + finally { + stopGrid(); + } + } + + /** + * Test event storage spi that throws an exception on try to query local events. + */ + @IgniteSpiMultipleInstancesSupport(true) + private static class GridTestRuntimeExceptionSpi extends IgniteSpiAdapter implements EventStorageSpi { + /** {@inheritDoc} */ + @Override public void spiStart(String gridName) throws IgniteSpiException { + startStopwatch(); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public <T extends IgniteEvent> Collection<T> localEvents(IgnitePredicate<T> p) { + throw new IgniteException(TEST_MSG); + } + + /** {@inheritDoc} */ + @Override public void record(IgniteEvent evt) throws IgniteSpiException { + // No-op. + } + } + + /** + * Test deployment spi that throws an exception on try to register any class. + */ + @IgniteSpiMultipleInstancesSupport(true) + private static class GridTestCheckedExceptionSpi extends IgniteSpiAdapter implements DeploymentSpi { + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { + startStopwatch(); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public DeploymentResource findResource(String rsrcName) { + // No-op. + return null; + } + + /** {@inheritDoc} */ + @Override public boolean register(ClassLoader ldr, Class<?> rsrc) throws IgniteSpiException { + throw new GridTestSpiException(TEST_MSG); + } + + /** {@inheritDoc} */ + @Override public boolean unregister(String rsrcName) { + // No-op. + return false; + } + + /** {@inheritDoc} */ + @Override public void setListener(DeploymentListener lsnr) { + // No-op. + } + } + + /** + * Test spi exception. + */ + private static class GridTestSpiException extends IgniteSpiException { + /** + * @param msg Error message. + */ + GridTestSpiException(String msg) { + super(msg); + } + + /** + * @param msg Error message. + * @param cause Error cause. + */ + GridTestSpiException(String msg, Throwable cause) { + super(msg, cause); + } + } +}