http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousJobAnnotationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousJobAnnotationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousJobAnnotationSelfTest.java new file mode 100644 index 0000000..4b33496 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousJobAnnotationSelfTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Test for various job callback annotations. + */ +@GridCommonTest(group = "Kernal Self") +public class GridContinuousJobAnnotationSelfTest extends GridCommonAbstractTest { + /** */ + private static final AtomicBoolean fail = new AtomicBoolean(); + + /** */ + private static final AtomicInteger afterSendCnt = new AtomicInteger(); + + /** */ + private static final AtomicInteger beforeFailoverCnt = new AtomicInteger(); + + /** */ + private static final AtomicReference<Exception> err = new AtomicReference<>(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setMarshalLocalJobs(false); + + return c; + } + + /** + * @throws Exception If test failed. + */ + public void testJobAnnotation() throws Exception { + testContinuousJobAnnotation(TestJob.class); + } + + /** + * @throws Exception If test failed. + */ + public void testJobChildAnnotation() throws Exception { + testContinuousJobAnnotation(TestJobChild.class); + } + + /** + * @param jobCls Job class. + * @throws Exception If test failed. + */ + public void testContinuousJobAnnotation(Class<?> jobCls) throws Exception { + try { + Ignite ignite = startGrid(0); + startGrid(1); + + fail.set(true); + + ignite.compute().execute(TestTask.class, jobCls); + + Exception e = err.get(); + + if (e != null) + throw e; + } + finally { + stopGrid(0); + stopGrid(1); + } + + assertEquals(2, afterSendCnt.getAndSet(0)); + assertEquals(1, beforeFailoverCnt.getAndSet(0)); + } + + /** */ + @SuppressWarnings({"PublicInnerClass", "unused"}) + public static class TestTask implements ComputeTask<Object, Object> { + /** */ + @IgniteTaskContinuousMapperResource + private ComputeTaskContinuousMapper mapper; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) throws IgniteCheckedException { + try { + mapper.send(((Class<ComputeJob>)arg).newInstance()); + } + catch (Exception e) { + throw new IgniteCheckedException("Job instantination failed.", e); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received) + throws IgniteCheckedException { + if (res.getException() != null) { + if (res.getException() instanceof ComputeUserUndeclaredException) + throw new IgniteCheckedException("Job threw unexpected exception.", res.getException()); + + return ComputeJobResultPolicy.FAILOVER; + } + + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == 1 : "Unexpected result count: " + results.size(); + + return null; + } + } + + /** + * + */ + private static class TestJob extends ComputeJobAdapter { + /** */ + private boolean flag = true; + + /** */ + TestJob() { + X.println("Constructing TestJob [this=" + this + ", identity=" + System.identityHashCode(this) + "]"); + } + + + /** */ + @ComputeJobAfterSend + private void afterSend() { + X.println("AfterSend start TestJob [this=" + this + ", identity=" + System.identityHashCode(this) + + ", flag=" + flag + "]"); + + afterSendCnt.incrementAndGet(); + + flag = false; + + X.println("AfterSend end TestJob [this=" + this + ", identity=" + System.identityHashCode(this) + + ", flag=" + flag + "]"); + } + + /** */ + @ComputeJobBeforeFailover + private void beforeFailover() { + X.println("BeforeFailover start TestJob [this=" + this + ", identity=" + System.identityHashCode(this) + + ", flag=" + flag + "]"); + + beforeFailoverCnt.incrementAndGet(); + + flag = true; + + X.println("BeforeFailover end TestJob [this=" + this + ", identity=" + System.identityHashCode(this) + + ", flag=" + flag + "]"); + } + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + X.println("Execute TestJob [this=" + this + ", identity=" + System.identityHashCode(this) + + ", flag=" + flag + "]"); + + if (!flag) { + String msg = "Flag is false on execute [this=" + this + ", identity=" + System.identityHashCode(this) + + ", flag=" + flag + "]"; + + X.println(msg); + + err.compareAndSet(null, new Exception(msg)); + } + + if (fail.get()) { + fail.set(false); + + throw new IgniteCheckedException("Expected test exception."); + } + + return null; + } + } + + /** + * + */ + private static class TestJobChild extends TestJob { + /** + * Required for reflectional creation. + */ + TestJobChild() { + // No-op. + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousJobSiblingsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousJobSiblingsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousJobSiblingsSelfTest.java new file mode 100644 index 0000000..6cb511e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousJobSiblingsSelfTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +/** + * Test continuous mapper with siblings. + */ +@GridCommonTest(group = "Kernal Self") +public class GridContinuousJobSiblingsSelfTest extends GridCommonAbstractTest { + /** */ + private static final int JOB_COUNT = 10; + + /** + * @throws Exception If test failed. + */ + public void testContinuousJobSiblings() throws Exception { + try { + Ignite ignite = startGrid(0); + startGrid(1); + + ignite.compute().execute(TestTask.class, null); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If test failed. + */ + public void testContinuousJobSiblingsLocalNode() throws Exception { + try { + Ignite ignite = startGrid(0); + + compute(ignite.cluster().forLocal()).execute(TestTask.class, null); + } + finally { + stopAllGrids(); + } + } + + /** */ + private static class TestTask extends ComputeTaskSplitAdapter<Object, Object> { + /** */ + @IgniteTaskContinuousMapperResource + private ComputeTaskContinuousMapper mapper; + + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + /** */ + private volatile int jobCnt; + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { + return Collections.singleton(new TestJob(++jobCnt)); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received) + throws IgniteCheckedException { + if (res.getException() != null) + throw new IgniteCheckedException("Job resulted in error: " + res, res.getException()); + + assert ses.getJobSiblings().size() == jobCnt; + + if (jobCnt < JOB_COUNT) { + mapper.send(new TestJob(++jobCnt)); + + assert ses.getJobSiblings().size() == jobCnt; + } + + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assertEquals(JOB_COUNT, results.size()); + + return null; + } + } + + /** */ + private static class TestJob extends ComputeJobAdapter { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** + * @param sibCnt Siblings count to check. + */ + TestJob(int sibCnt) { + super(sibCnt); + } + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + assert ses != null; + assert argument(0) != null; + + Integer sibCnt = argument(0); + + log.info("Executing job."); + + assert sibCnt != null; + + Collection<ComputeJobSibling> sibs = ses.getJobSiblings(); + + assert sibs != null; + assert sibs.size() == sibCnt : "Unexpected siblings collection [expectedSize=" + sibCnt + + ", siblingsCnt=" + sibs.size() + ", siblings=" + sibs + ']'; + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java new file mode 100644 index 0000000..f4729c4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +/** + * Continuous task test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridContinuousTaskSelfTest extends GridCommonAbstractTest { + /** */ + private static final int JOB_COUNT = 10; + + /** */ + private static final int THREAD_COUNT = 10; + + /** + * @throws Exception If test failed. + */ + public void testContinuousJobsChain() throws Exception { + try { + Ignite ignite = startGrid(0); + + IgniteCompute comp = ignite.compute().enableAsync(); + + comp.execute(TestJobsChainTask.class, true); + + ComputeTaskFuture<Integer> fut1 = comp.future(); + + comp.execute(TestJobsChainTask.class, false); + + ComputeTaskFuture<Integer> fut2 = comp.future(); + + assert fut1.get() == 55; + assert fut2.get() == 55; + } + finally { + stopGrid(0); + } + } + + /** + * @throws Exception If test failed. + */ + public void testContinuousJobsChainMultiThreaded() throws Exception { + try { + final Ignite ignite = startGrid(0); + startGrid(1); + + GridTestUtils.runMultiThreaded(new Runnable() { + /** {@inheritDoc} */ + @Override public void run() { + try { + IgniteCompute comp = ignite.compute().enableAsync(); + + comp.execute(TestJobsChainTask.class, true); + + ComputeTaskFuture<Integer> fut1 = comp.future(); + + comp.execute(TestJobsChainTask.class, false); + + ComputeTaskFuture<Integer> fut2 = comp.future(); + + assert fut1.get() == 55; + assert fut2.get() == 55; + } + catch (IgniteCheckedException e) { + assert false : "Test task failed: " + e; + } + } + + }, THREAD_COUNT, "continuous-jobs-chain"); + } + finally { + stopGrid(0); + stopGrid(1); + } + } + + /** + * @throws Exception If test failed. + */ + public void testContinuousJobsSessionChain() throws Exception { + try { + Ignite ignite = startGrid(0); + startGrid(1); + + ignite.compute().execute(SessionChainTestTask.class, false); + } + finally { + stopGrid(0); + stopGrid(1); + } + } + + /** + * @throws Exception If test failed. + */ + public void testContinuousSlowMap() throws Exception { + try { + Ignite ignite = startGrid(0); + + Integer cnt = ignite.compute().execute(SlowMapTestTask.class, null); + + assert cnt != null; + assert cnt == 2 : "Unexpected result: " + cnt; + } + finally { + stopGrid(0); + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + public static class TestJobsChainTask implements ComputeTask<Boolean, Integer> { + /** */ + @IgniteTaskContinuousMapperResource + private ComputeTaskContinuousMapper mapper; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private int cnt; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Boolean arg) throws IgniteCheckedException { + assert mapper != null; + assert arg != null; + + ComputeJob job = new TestJob(++cnt); + + if (arg) { + mapper.send(job, subgrid.get(0)); + + log.info("Sent test task by continuous mapper: " + job); + } + else { + log.info("Will return job as map() result: " + job); + + return Collections.singletonMap(job, subgrid.get(0)); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received) throws IgniteCheckedException { + assert mapper != null; + assert res.getException() == null : "Unexpected exception: " + res.getException(); + + log.info("Received job result [result=" + res + ", count=" + cnt + ']'); + + int tmp = ++cnt; + + if (tmp <= JOB_COUNT) { + mapper.send(new TestJob(tmp)); + + log.info("Sent test task by continuous mapper (from result() method)."); + } + + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == 10 : "Unexpected result count: " + results.size(); + + log.info("Called reduce() method [results=" + results + ']'); + + int res = 0; + + for (ComputeJobResult result : results) { + assert result.getData() != null : "Unexpected result data (null): " + result; + + res += (Integer)result.getData(); + } + + return res; + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + public static class TestJob extends ComputeJobAdapter { + /** */ + public TestJob() { /* No-op. */ } + + /** + * @param arg Job argument. + */ + public TestJob(Integer arg) { + super(arg); + } + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + Integer i = argument(0); + + return i != null ? i : 0; + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + @ComputeTaskSessionFullSupport + public static class SessionChainTestTask extends ComputeTaskSplitAdapter<Object, Object> { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + /** */ + @IgniteTaskContinuousMapperResource + private ComputeTaskContinuousMapper mapper; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { + ses.addAttributeListener(new ComputeTaskSessionAttributeListener() { + @Override public void onAttributeSet(Object key, Object val) { + if (key instanceof String) { + if (((String)key).startsWith("sendJob")) { + assert val instanceof Integer; + + int cnt = (Integer)val; + + if (cnt < JOB_COUNT) { + try { + mapper.send(new SessionChainTestJob(cnt)); + } + catch (IgniteCheckedException e) { + log.error("Failed to send new job.", e); + } + } + } + } + } + }, true); + + Collection<ComputeJob> jobs = new ArrayList<>(); + + for (int i = 0; i < JOB_COUNT; i++) + jobs.add(new SessionChainTestJob(0)); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assertEquals(JOB_COUNT * JOB_COUNT, results.size()); + + return null; + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + public static class SessionChainTestJob extends ComputeJobAdapter { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + /** */ + @IgniteJobContextResource + private ComputeJobContext ctx; + + /** */ + public SessionChainTestJob() { /* No-op. */} + + /** + * @param arg Job argument. + */ + public SessionChainTestJob(Integer arg) { + super(arg); + } + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + Integer i = argument(0); + + int arg = i != null ? i : 0; + + ses.setAttribute("sendJob" + ctx.getJobId(), 1 + arg); + + return arg; + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + public static class SlowMapTestTask extends ComputeTaskAdapter<Object, Integer> { + /** */ + @IgniteTaskContinuousMapperResource + private ComputeTaskContinuousMapper mapper; + + /** */ + private int cnt; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) throws IgniteCheckedException { + mapper.send(new TestJob(++cnt)); + + try { + Thread.sleep(10000); + } + catch (InterruptedException e) { + throw new IgniteCheckedException("Job has been interrupted.", e); + } + + mapper.send(new TestJob(++cnt)); + + return null; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return results == null ? 0 : results.size(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridDeploymentMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDeploymentMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDeploymentMultiThreadedSelfTest.java new file mode 100644 index 0000000..01e596b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDeploymentMultiThreadedSelfTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; + +/** + * Task deployment tests. + */ +public class GridDeploymentMultiThreadedSelfTest extends GridCommonAbstractTest { + /** */ + private static final int THREAD_CNT = 20; + + /** */ + private static final int EXEC_CNT = 30000; + + /** + * @throws Exception If failed. + */ + public void testDeploy() throws Exception { + try { + final Ignite ignite = startGrid(0); + + ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader()); + + assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null; + + ignite.compute().undeployTask(GridDeploymentTestTask.class.getName()); + + final CyclicBarrier barrier = new CyclicBarrier(THREAD_CNT, new Runnable() { + private int iterCnt; + + @Override public void run() { + try { + ignite.compute().undeployTask(GridDeploymentTestTask.class.getName()); + + assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) == null; + + if (++iterCnt % 100 == 0) + info("Iterations count: " + iterCnt); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to undeploy task message.", e); + + fail("See logs for details."); + } + } + }); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + for (int i = 0; i < EXEC_CNT; i++) { + barrier.await(2000, MILLISECONDS); + + ignite.compute().localDeployTask(GridDeploymentTestTask.class, + GridDeploymentTestTask.class.getClassLoader()); + + assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null; + } + } + catch (Exception e) { + U.error(log, "Test failed.", e); + + throw e; + } + finally { + info("Thread finished."); + } + + return null; + } + }, THREAD_CNT, "grid-load-test-thread"); + } + finally { + stopAllGrids(); + } + } + + /** + * Test task. + */ + private static class GridDeploymentTestTask extends ComputeTaskAdapter<Object, Object> { + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) throws IgniteCheckedException { + assert false; + + return Collections.emptyMap(); + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridDeploymentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDeploymentSelfTest.java new file mode 100644 index 0000000..a246035 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDeploymentSelfTest.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.deployment.local.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Task deployment tests. + */ +@SuppressWarnings("unchecked") +@GridCommonTest(group = "Kernal Self") +public class GridDeploymentSelfTest extends GridCommonAbstractTest { + /** */ + private TestDeploymentSpi depSpi; + + /** */ + private boolean p2pEnabled = true; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + depSpi = new TestDeploymentSpi(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + depSpi = null; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setDeploymentSpi(depSpi = new TestDeploymentSpi()); + cfg.setPeerClassLoadingEnabled(p2pEnabled); + + // Disable cache since it can deploy some classes during start process. + cfg.setCacheConfiguration(); + + return cfg; + } + + /** */ + public GridDeploymentSelfTest() { + super(/*start grid*/false); + } + + /** + * @param ignite Grid. + * @param taskName Task name. + * @return {@code True} if task is not deployed. + */ + private boolean checkUndeployed(Ignite ignite, String taskName) { + return ignite.compute().localTasks().get(taskName) == null; + } + + /** + * @param ignite Grid. + */ + @SuppressWarnings({"CatchGenericClass"}) + private void stopGrid(Ignite ignite) { + try { + if (ignite != null) + stopGrid(ignite.name()); + } + catch (Throwable e) { + error("Got error when stopping grid.", e); + } + } + + /** + * @throws Exception If failed. + */ + public void testDeploy() throws Exception { + Ignite ignite = startGrid(getTestGridName()); + + try { + ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader()); + + assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null; + + ignite.compute().undeployTask(GridDeploymentTestTask.class.getName()); + + assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 1 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + assert checkUndeployed(ignite, GridDeploymentTestTask.class.getName()); + } + finally { + stopGrid(ignite); + } + } + + /** + * @throws Exception If failed. + */ + public void testIgnoreDeploymentSpi() throws Exception { + // If peer class loading is disabled and local deployment SPI + // is configured, SPI should be ignored. + p2pEnabled = false; + + Ignite ignite = startGrid(getTestGridName()); + + try { + ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader()); + + assert depSpi.getRegisterCount() == 0 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + ignite.compute().undeployTask(GridDeploymentTestTask.class.getName()); + + assert depSpi.getRegisterCount() == 0 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + } + finally { + stopGrid(ignite); + } + } + + /** + * @throws Exception If failed. + */ + public void testRedeploy() throws Exception { + Ignite ignite = startGrid(getTestGridName()); + + try { + // Added to work with P2P. + ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader()); + + // Check auto-deploy. + ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridDeploymentTestTask.class.getName(), null); + + fut.get(); + + assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null; + + // Check 2nd execute. + fut = executeAsync(ignite.compute(), GridDeploymentTestTask.class.getName(), null); + + fut.get(); + + assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null; + + // Redeploy, should be NO-OP for the same task. + ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader()); + + assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null; + + assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + // Check 2nd execute. + fut = executeAsync(ignite.compute(), GridDeploymentTestTask.class.getName(), null); + + fut.get(); + + assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null; + + assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + // Check undeploy. + ignite.compute().undeployTask(GridDeploymentTestTask.class.getName()); + + assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) == null; + + assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 1 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + // Added to work with P2P + ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader()); + + // Check auto-deploy. + executeAsync(ignite.compute(), GridDeploymentTestTask.class.getName(), null); + + assert depSpi.getRegisterCount() == 2; + assert depSpi.getUnregisterCount() == 1; + + assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null; + + ignite.compute().localDeployTask(GridDeploymentTestTask1.class, + GridDeploymentTestTask1.class.getClassLoader()); + + try { + ignite.compute().localDeployTask(GridDeploymentTestTask2.class, + GridDeploymentTestTask2.class.getClassLoader()); + + assert false : "Should not be able to deploy 2 task with same task name"; + } + catch (IgniteCheckedException e) { + info("Received expected grid exception: " + e); + } + + assert depSpi.getRegisterCount() == 3 : "Invalid register count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 1 : "Invalid unregister count: " + depSpi.getUnregisterCount(); + + assert ignite.compute().localTasks().get("GridDeploymentTestTask") != null; + + Class<? extends ComputeTask<?, ?>> cls = ignite.compute().localTasks().get("GridDeploymentTestTask"); + + assert cls.getName().equals(GridDeploymentTestTask1.class.getName()); + } + finally { + stopGrid(ignite); + } + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"BusyWait"}) + public void testDeployOnTwoNodes() throws Exception { + Ignite ignite1 = startGrid(getTestGridName() + '1'); + Ignite ignite2 = startGrid(getTestGridName() + '2'); + + try { + assert !ignite1.cluster().forRemotes().nodes().isEmpty() : ignite1.cluster().forRemotes(); + assert !ignite2.cluster().forRemotes().nodes().isEmpty() : ignite2.cluster().forRemotes(); + + ignite1.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader()); + ignite2.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader()); + + assert ignite1.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null; + assert ignite2.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null; + + ignite1.compute().undeployTask(GridDeploymentTestTask.class.getName()); + + assert checkUndeployed(ignite1, GridDeploymentTestTask.class.getName()); + + int cnt = 0; + + boolean taskUndeployed = false; + + while (cnt++ < 10 && !taskUndeployed) { + taskUndeployed = checkUndeployed(ignite2, GridDeploymentTestTask.class.getName()); + + if (!taskUndeployed) + Thread.sleep(500); + } + + // Undeploy on one node should undeploy explicitly deployed + // tasks on the others + assert taskUndeployed; + } + finally { + stopGrid(ignite1); + stopGrid(ignite2); + } + } + + /** + * @throws Exception If failed. + */ + public void testDeployEvents() throws Exception { + Ignite ignite = startGrid(getTestGridName()); + + try { + DeploymentEventListener evtLsnr = new DeploymentEventListener(); + + ignite.events().localListen(evtLsnr, EVT_TASK_DEPLOYED, EVT_TASK_UNDEPLOYED); + + // Should generate 1st deployment event. + ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader()); + + assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null; + + // Should generate 1st un-deployment event. + ignite.compute().undeployTask(GridDeploymentTestTask.class.getName()); + + assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 1 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + assert checkUndeployed(ignite, GridDeploymentTestTask.class.getName()); + + // Should generate 2nd deployment event. + ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader()); + + assert depSpi.getRegisterCount() == 2 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 1 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null; + + // Should generate 2nd un-deployment event. + ignite.compute().undeployTask(GridDeploymentTestTask.class.getName()); + + assert depSpi.getRegisterCount() == 2 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 2 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + assert checkUndeployed(ignite, GridDeploymentTestTask.class.getName()); + + // Should generate 3rd deployment event. + ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader()); + + assert depSpi.getRegisterCount() == 3 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 2 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null; + + // Should generate 3rd un-deployment event. + ignite.compute().undeployTask(GridDeploymentTestTask.class.getName()); + + assert depSpi.getRegisterCount() == 3 : "Invalid deploy count: " + depSpi.getRegisterCount(); + assert depSpi.getUnregisterCount() == 3 : "Invalid undeploy count: " + depSpi.getUnregisterCount(); + + assert checkUndeployed(ignite, GridDeploymentTestTask.class.getName()); + + assert evtLsnr.getDeployCount() == 3 : "Invalid number of deployment events" + evtLsnr.getDeployCount(); + assert evtLsnr.getUndeployCount() == 3 : "Invalid number of un-deployment events" + evtLsnr.getDeployCount(); + } + finally { + stopGrid(ignite); + } + } + + /** + * Test deployable task. + */ + private static class GridDeploymentTestTask extends ComputeTaskAdapter<Object, Object> { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) + throws IgniteCheckedException { + Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size()); + + for (ClusterNode node : subgrid) { + map.put(new ComputeJobAdapter() { + @Override public Serializable execute() { + if (log.isInfoEnabled()) + log.info("Executing grid job: " + this); + + return null; + } + }, node); + } + + return map; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } + + /** + * Test deployable named task. + */ + @ComputeTaskName(value = "GridDeploymentTestTask") + private static class GridDeploymentTestTask1 extends ComputeTaskAdapter<Object, Object> { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) throws IgniteCheckedException { + Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size()); + + for (ClusterNode node : subgrid) { + map.put(new ComputeJobAdapter() { + @Override public Serializable execute() { + log.info("Executing grid job: " + this); + + return null; + } + }, node); + } + + return map; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } + + /** + * Test deployable named task. + */ + @ComputeTaskName(value = "GridDeploymentTestTask") + private static class GridDeploymentTestTask2 extends ComputeTaskAdapter<Object, Object> { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) throws IgniteCheckedException { + Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size()); + + for (ClusterNode node : subgrid) { + map.put(new ComputeJobAdapter() { + @Override public Serializable execute() { + if (log.isInfoEnabled()) + log.info("Executing grid job: " + this); + + return null; + } + }, node); + } + + return map; + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } + + /** + * + * Test deployment spi. + */ + private static class TestDeploymentSpi extends LocalDeploymentSpi { + /** */ + private volatile int deployCnt; + + /** */ + private volatile int undeployCnt; + + /** {@inheritDoc} */ + @Override public boolean register(ClassLoader ldr, Class rsrc) throws IgniteSpiException { + if (super.register(ldr, rsrc)) { + deployCnt++; + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean unregister(String rsrcName) { + undeployCnt++; + + return super.unregister(rsrcName); + } + + /** + * @return Deploy count. + */ + public int getRegisterCount() { + return deployCnt; + } + + /** + * @return Undeploy count. + */ + public int getUnregisterCount() { + return undeployCnt; + } + } + + /** + * Deployment listener. + */ + private static class DeploymentEventListener implements IgnitePredicate<IgniteEvent> { + /** */ + private int depCnt; + + /** */ + private int undepCnt; + + /** + * Gonna process task deployment events only. + * + * @param evt local grid event. + */ + @Override public boolean apply(IgniteEvent evt) { + if (evt.type() == EVT_TASK_DEPLOYED) + depCnt++; + else if (evt.type() == EVT_TASK_UNDEPLOYED) + undepCnt++; + + return true; + } + + /** + * @return Deploy count. + */ + public int getDeployCount() { + return depCnt; + } + + /** + * @return Undeploy count. + */ + public int getUndeployCount() { + return undepCnt; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java new file mode 100644 index 0000000..d57c8ee --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Tests discovery event topology snapshots. + */ +public class GridDiscoveryEventSelfTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Daemon flag. */ + private boolean daemon; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + daemon = false; + } + + /** */ + private static final IgniteClosure<ClusterNode, UUID> NODE_2ID = new IgniteClosure<ClusterNode, UUID>() { + @Override public UUID apply(ClusterNode n) { + return n.id(); + } + + @Override public String toString() { + return "Grid node shadow to node ID transformer closure."; + } + }; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setDaemon(daemon); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + c.setRestEnabled(false); + + return c; + } + + /** + * @throws Exception If failed. + */ + public void testJoinSequenceEvents() throws Exception { + try { + Ignite g0 = startGrid(0); + + UUID id0 = g0.cluster().localNode().id(); + + final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>(); + + g0.events().localListen(new IgnitePredicate<IgniteEvent>() { + private AtomicInteger cnt = new AtomicInteger(); + + @Override public boolean apply(IgniteEvent evt) { + assert evt.type() == EVT_NODE_JOINED; + + evts.put(cnt.getAndIncrement(), ((IgniteDiscoveryEvent) evt).topologyNodes()); + + return true; + } + }, EVT_NODE_JOINED); + + UUID id1 = startGrid(1).cluster().localNode().id(); + UUID id2 = startGrid(2).cluster().localNode().id(); + UUID id3 = startGrid(3).cluster().localNode().id(); + + U.sleep(100); + + assertEquals("Wrong count of events received", 3, evts.size()); + + Collection<ClusterNode> top0 = evts.get(0); + + assertNotNull(top0); + assertEquals(2, top0.size()); + assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id0)); + assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id1)); + + Collection<ClusterNode> top1 = evts.get(1); + + assertNotNull(top1); + assertEquals(3, top1.size()); + assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id0)); + assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id1)); + assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id2)); + + Collection<ClusterNode> top2 = evts.get(2); + + assertNotNull(top2); + assertEquals(4, top2.size()); + assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id0)); + assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id1)); + assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id2)); + assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id3)); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testLeaveSequenceEvents() throws Exception { + try { + Ignite g0 = startGrid(0); + + UUID id0 = g0.cluster().localNode().id(); + UUID id1 = startGrid(1).cluster().localNode().id(); + UUID id2 = startGrid(2).cluster().localNode().id(); + UUID id3 = startGrid(3).cluster().localNode().id(); + + final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>(); + + g0.events().localListen(new IgnitePredicate<IgniteEvent>() { + private AtomicInteger cnt = new AtomicInteger(); + + @Override public boolean apply(IgniteEvent evt) { + assert evt.type() == EVT_NODE_LEFT; + + evts.put(cnt.getAndIncrement(), ((IgniteDiscoveryEvent) evt).topologyNodes()); + + return true; + } + }, EVT_NODE_LEFT); + + stopGrid(3); + stopGrid(2); + stopGrid(1); + + U.sleep(100); + + assertEquals("Wrong count of events received", 3, evts.size()); + + Collection<ClusterNode> top2 = evts.get(0); + + assertNotNull(top2); + assertEquals(3, top2.size()); + assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id0)); + assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id1)); + assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id2)); + assertFalse(F.viewReadOnly(top2, NODE_2ID).contains(id3)); + + Collection<ClusterNode> top1 = evts.get(1); + + assertNotNull(top1); + assertEquals(2, top1.size()); + assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id0)); + assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id1)); + assertFalse(F.viewReadOnly(top1, NODE_2ID).contains(id2)); + assertFalse(F.viewReadOnly(top1, NODE_2ID).contains(id3)); + + Collection<ClusterNode> top0 = evts.get(2); + + assertNotNull(top0); + assertEquals(1, top0.size()); + assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id0)); + assertFalse(F.viewReadOnly(top0, NODE_2ID).contains(id1)); + assertFalse(F.viewReadOnly(top0, NODE_2ID).contains(id2)); + assertFalse(F.viewReadOnly(top0, NODE_2ID).contains(id3)); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testMixedSequenceEvents() throws Exception { + try { + Ignite g0 = startGrid(0); + + UUID id0 = g0.cluster().localNode().id(); + + final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>(); + + g0.events().localListen(new IgnitePredicate<IgniteEvent>() { + private AtomicInteger cnt = new AtomicInteger(); + + @Override public boolean apply(IgniteEvent evt) { + assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT; + + evts.put(cnt.getAndIncrement(), ((IgniteDiscoveryEvent) evt).topologyNodes()); + + return true; + } + }, EVT_NODE_JOINED, EVT_NODE_LEFT); + + UUID id1 = startGrid(1).cluster().localNode().id(); + UUID id2 = startGrid(2).cluster().localNode().id(); + UUID id3 = startGrid(3).cluster().localNode().id(); + + stopGrid(3); + stopGrid(2); + stopGrid(1); + + UUID id4 = startGrid(4).cluster().localNode().id(); + + stopGrid(4); + + U.sleep(100); + + assertEquals("Wrong count of events received", 8, evts.size()); + + Collection<ClusterNode> top0 = evts.get(0); + + assertNotNull(top0); + assertEquals(2, top0.size()); + assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id0)); + assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id1)); + + Collection<ClusterNode> top1 = evts.get(1); + + assertNotNull(top1); + assertEquals(3, top1.size()); + assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id0)); + assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id1)); + assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id2)); + + Collection<ClusterNode> top2 = evts.get(2); + + assertNotNull(top2); + assertEquals(4, top2.size()); + assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id0)); + assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id1)); + assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id2)); + assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id3)); + + Collection<ClusterNode> top3 = evts.get(3); + + assertNotNull(top3); + assertEquals(3, top3.size()); + assertTrue(F.viewReadOnly(top3, NODE_2ID).contains(id0)); + assertTrue(F.viewReadOnly(top3, NODE_2ID).contains(id1)); + assertTrue(F.viewReadOnly(top3, NODE_2ID).contains(id2)); + assertFalse(F.viewReadOnly(top3, NODE_2ID).contains(id3)); + + Collection<ClusterNode> top4 = evts.get(4); + + assertNotNull(top4); + assertEquals(2, top4.size()); + assertTrue(F.viewReadOnly(top4, NODE_2ID).contains(id0)); + assertTrue(F.viewReadOnly(top4, NODE_2ID).contains(id1)); + assertFalse(F.viewReadOnly(top4, NODE_2ID).contains(id2)); + assertFalse(F.viewReadOnly(top4, NODE_2ID).contains(id3)); + + Collection<ClusterNode> top5 = evts.get(5); + + assertNotNull(top5); + assertEquals(1, top5.size()); + assertTrue(F.viewReadOnly(top5, NODE_2ID).contains(id0)); + assertFalse(F.viewReadOnly(top5, NODE_2ID).contains(id1)); + assertFalse(F.viewReadOnly(top5, NODE_2ID).contains(id2)); + assertFalse(F.viewReadOnly(top5, NODE_2ID).contains(id3)); + + Collection<ClusterNode> top6 = evts.get(6); + + assertNotNull(top6); + assertEquals(2, top6.size()); + assertTrue(F.viewReadOnly(top6, NODE_2ID).contains(id0)); + assertTrue(F.viewReadOnly(top6, NODE_2ID).contains(id4)); + assertFalse(F.viewReadOnly(top6, NODE_2ID).contains(id1)); + assertFalse(F.viewReadOnly(top6, NODE_2ID).contains(id2)); + assertFalse(F.viewReadOnly(top6, NODE_2ID).contains(id3)); + + Collection<ClusterNode> top7 = evts.get(7); + + assertNotNull(top7); + assertEquals(1, top7.size()); + assertTrue(F.viewReadOnly(top7, NODE_2ID).contains(id0)); + assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id1)); + assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id2)); + assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id3)); + assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id4)); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentJoinEvents() throws Exception { + try { + Ignite g0 = startGrid(0); + + UUID id0 = g0.cluster().localNode().id(); + + final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>(); + + g0.events().localListen(new IgnitePredicate<IgniteEvent>() { + private AtomicInteger cnt = new AtomicInteger(); + + @Override public boolean apply(IgniteEvent evt) { + assert evt.type() == EVT_NODE_JOINED; + + X.println(">>>>>>> Joined " + F.viewReadOnly(((IgniteDiscoveryEvent) evt).topologyNodes(), + NODE_2ID)); + + evts.put(cnt.getAndIncrement(), ((IgniteDiscoveryEvent) evt).topologyNodes()); + + return true; + } + }, EVT_NODE_JOINED); + + U.sleep(100); + + startGridsMultiThreaded(1, 10); + + U.sleep(100); + + assertEquals(10, evts.size()); + + for (int i = 0; i < 10; i++) { + Collection<ClusterNode> snapshot = evts.get(i); + + assertEquals(2 + i, snapshot.size()); + assertTrue(F.viewReadOnly(snapshot, NODE_2ID).contains(id0)); + + for (ClusterNode n : snapshot) + assertTrue("Wrong node order in snapshot [i=" + i + ", node=" + n + ']', n.order() <= 2 + i); + } + + Collection<UUID> ids = F.viewReadOnly(evts.get(9), NODE_2ID); + + for (int i = 1; i <= 10; i++) + assertTrue(ids.contains(grid(i).localNode().id())); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testDaemonNodeJoin() throws Exception { + try { + startGridsMultiThreaded(3); + + final AtomicReference<IgniteCheckedException> err = new AtomicReference<>(); + + for (int i = 0; i < 3; i++) { + Ignite g = grid(i); + + g.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent) evt; + + if (discoEvt.topologyNodes().size() != 3) + err.compareAndSet(null, new IgniteCheckedException("Invalid discovery event [evt=" + discoEvt + + ", nodes=" + discoEvt.topologyNodes() + ']')); + + return true; + } + }, IgniteEventType.EVT_NODE_JOINED); + } + + daemon = true; + + GridKernal daemon = (GridKernal)startGrid(3); + + IgniteDiscoveryEvent join = daemon.context().discovery().localJoinEvent(); + + assertEquals(3, join.topologyNodes().size()); + + U.sleep(100); + + if (err.get() != null) + throw err.get(); + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java new file mode 100644 index 0000000..dee5200 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.product.*; +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.product.IgniteProductVersion.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * GridDiscovery self test. + */ +public class GridDiscoverySelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static Ignite ignite; + + /** Nodes count. */ + private static final int NODES_CNT = 5; + + /** Maximum timeout when remote nodes join/left the topology */ + private static final int MAX_TIMEOUT_IN_MINS = 5; + + /** */ + public GridDiscoverySelfTest() { + super(/*start grid*/true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + //cacheCfg.setName(null); + cacheCfg.setCacheMode(GridCacheMode.PARTITIONED); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite = G.ignite(getTestGridName()); + } + + /** + * @throws Exception If failed. + */ + public void testGetRemoteNodes() throws Exception { + Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes(); + + printNodes(nodes); + } + + /** + * @throws Exception If failed. + */ + public void testGetAllNodes() throws Exception { + Collection<ClusterNode> nodes = ignite.cluster().nodes(); + + printNodes(nodes); + + assert nodes != null; + assert !nodes.isEmpty(); + } + + /** + * @throws Exception If failed. + */ + public void testGetTopologyHash() throws Exception { + int hashCnt = 5000; + + Random rand = new Random(); + + Collection<Long> hashes = new HashSet<>(hashCnt, 1.0f); + + for (int i = 0; i < hashCnt; i++) { + // Max topology of 10 nodes. + int size = rand.nextInt(10) + 1; + + Collection<ClusterNode> nodes = new ArrayList<>(size); + + for (int j = 0; j < size; j++) + nodes.add(new GridDiscoveryTestNode()); + + @SuppressWarnings("deprecation") + long hash = ((GridKernal) ignite).context().discovery().topologyHash(nodes); + + boolean isHashed = hashes.add(hash); + + assert isHashed : "Duplicate hash [hash=" + hash + ", topSize=" + size + ", iteration=" + i + ']'; + } + + info("No duplicates found among '" + hashCnt + "' hashes."); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"SuspiciousMethodCalls"}) + public void testGetLocalNode() throws Exception { + ClusterNode node = ignite.cluster().localNode(); + + assert node != null; + + Collection<ClusterNode> nodes = ignite.cluster().nodes(); + + assert nodes != null; + assert nodes.contains(node); + } + + /** + * @throws Exception If failed. + */ + public void testPingNode() throws Exception { + ClusterNode node = ignite.cluster().localNode(); + + assert node != null; + + boolean pingRes = ignite.cluster().pingNode(node.id()); + + assert pingRes : "Failed to ping local node."; + } + + /** + * @throws Exception If failed. + */ + public void testDiscoveryListener() throws Exception { + ClusterNode node = ignite.cluster().localNode(); + + assert node != null; + + final AtomicInteger cnt = new AtomicInteger(); + + /** Joined nodes counter. */ + final CountDownLatch joinedCnt = new CountDownLatch(NODES_CNT); + + /** Left nodes counter. */ + final CountDownLatch leftCnt = new CountDownLatch(NODES_CNT); + + IgnitePredicate<IgniteEvent> lsnr = new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + if (EVT_NODE_JOINED == evt.type()) { + cnt.incrementAndGet(); + + joinedCnt.countDown(); + } + else if (EVT_NODE_LEFT == evt.type()) { + int i = cnt.decrementAndGet(); + + assert i >= 0; + + leftCnt.countDown(); + } + else + assert false; + + return true; + } + }; + + ignite.events().localListen(lsnr, EVT_NODE_LEFT, EVT_NODE_JOINED); + + try { + for (int i = 0; i < NODES_CNT; i++) + startGrid(i); + + joinedCnt.await(MAX_TIMEOUT_IN_MINS, MINUTES); + + assert cnt.get() == NODES_CNT; + + for (int i = 0; i < NODES_CNT; i++) + stopGrid(i); + + leftCnt.await(MAX_TIMEOUT_IN_MINS, MINUTES); + + assert cnt.get() == 0; + + ignite.events().stopLocalListen(lsnr); + + assert cnt.get() == 0; + } + finally { + for (int i = 0; i < NODES_CNT; i++) + stopAndCancelGrid(i); + } + } + + /** + * Test cache nodes resolved correctly from topology history. + * + * @throws Exception In case of any exception. + */ + public void testCacheNodes() throws Exception { + // Validate only original node is available. + GridDiscoveryManager discoMgr = ((GridKernal) ignite).context().discovery(); + + Collection<ClusterNode> nodes = discoMgr.allNodes(); + + assert nodes.size() == 1 : "Expects only original node is available: " + nodes; + + final long topVer0 = discoMgr.topologyVersion(); + + assert topVer0 > 0 : "Unexpected initial topology version: " + topVer0; + + List<UUID> uuids = new ArrayList<>(NODES_CNT); + + UUID locId = ignite.cluster().localNode().id(); + + try { + // Start nodes. + for (int i = 0; i < NODES_CNT; i++) + uuids.add(startGrid(i).cluster().localNode().id()); + + // Stop nodes. + for (int i = 0; i < NODES_CNT; i++) + stopGrid(i); + + final long topVer = discoMgr.topologyVersion(); + + assert topVer == topVer0 + NODES_CNT * 2 : "Unexpected topology version: " + topVer; + + for (long ver = topVer0; ver <= topVer; ver++) { + Collection<UUID> exp = new ArrayList<>(); + + exp.add(locId); + + for (int i = 0; i < NODES_CNT && i < ver - topVer0; i++) + exp.add(uuids.get(i)); + + for (int i = 0; i < ver - topVer0 - NODES_CNT; i++) + exp.remove(uuids.get(i)); + + // Cache nodes by topology version (e.g. NODE_CNT == 3). + // 0 1 2 3 (node id) + // 1 (topVer) + - only local node + // 2 + + + // 3 + + + + // 4 + + + + + // 5 + + + + // 6 + + + // 7 + - only local node + + Collection<ClusterNode> cacheNodes = discoMgr.cacheNodes(null, ver); + + Collection<UUID> act = new ArrayList<>(F.viewReadOnly(cacheNodes, new C1<ClusterNode, UUID>() { + @Override public UUID apply(ClusterNode n) { + return n.id(); + } + })); + + assertEquals("Expects correct cache nodes for topology version: " + ver, exp, act); + } + } + finally { + for (int i = 0; i < NODES_CNT; i++) + stopAndCancelGrid(i); + } + } + + /** + * @param nodes Nodes. + */ + private void printNodes(Collection<ClusterNode> nodes) { + StringBuilder buf = new StringBuilder(); + + if (nodes != null && !nodes.isEmpty()) { + buf.append("Found nodes [nodes={"); + + int i = 0; + + for (Iterator<ClusterNode> iter = nodes.iterator(); iter.hasNext(); i++) { + ClusterNode node = iter.next(); + + buf.append(node.id()); + + if (i + 1 != nodes.size()) + buf.append(", "); + } + + buf.append("}]"); + } + else + buf.append("Found no nodes."); + + if (log().isDebugEnabled()) + log().debug(buf.toString()); + } + + /** + * + */ + private static class GridDiscoveryTestNode extends GridMetadataAwareAdapter implements ClusterNode { + /** */ + private static AtomicInteger consistentIdCtr = new AtomicInteger(); + + /** */ + private UUID nodeId = UUID.randomUUID(); + + /** */ + private Object consistentId = consistentIdCtr.incrementAndGet(); + + /** {@inheritDoc} */ + @Override public long order() { + return -1; + } + + /** {@inheritDoc} */ + @Override public IgniteProductVersion version() { + return fromString("99.99.99"); + } + + /** {@inheritDoc} */ + @Override public UUID id() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public Object consistentId() { + return consistentId; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public <T> T attribute(String name) { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public ClusterNodeMetrics metrics() { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<String, Object> attributes() { + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<String> addresses() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isLocal() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isDaemon() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isClient() { + return false; + } + + /** {@inheritDoc} */ + @Override public Collection<String> hostNames() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return F.eqNodes(this, o); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id().hashCode(); + } + } +}