http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobMasterLeaveAwareSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobMasterLeaveAwareSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobMasterLeaveAwareSelfTest.java deleted file mode 100644 index e1f32d9..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobMasterLeaveAwareSelfTest.java +++ /dev/null @@ -1,802 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.spi.communication.tcp.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.cache.GridCacheMode.*; - -/** - * Test behavior of jobs when master node has failed, but job class implements {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} - * interface. - */ -@GridCommonTest(group = "Task Session") -public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest { - /** Total grid count within the cloud. */ - private static final int GRID_CNT = 2; - - /** Default IP finder for single-JVM cloud grid. */ - private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** Counts how many times master-leave interface implementation was called. */ - private static volatile CountDownLatch invokeLatch; - - /** Latch which blocks job execution until main thread has sent node fail signal. */ - private static volatile CountDownLatch latch; - - /** Latch which blocks main thread until all jobs start their execution. */ - private static volatile CountDownLatch jobLatch; - - /** Should job wait for callback. */ - private static volatile boolean awaitMasterLeaveCallback = true; - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - awaitMasterLeaveCallback = true; - latch = new CountDownLatch(1); - jobLatch = new CountDownLatch(GRID_CNT - 1); - invokeLatch = new CountDownLatch(GRID_CNT - 1); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - discoSpi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(discoSpi); - - cfg.setCommunicationSpi(new CommunicationSpi()); - cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); - - CacheConfiguration ccfg = defaultCacheConfiguration(); - - ccfg.setCacheMode(PARTITIONED); - - cfg.setCacheConfiguration(ccfg); - - return cfg; - } - - /** - * Get predicate which allows task execution on all nodes except the last one. - * - * @return Predicate. - */ - private IgnitePredicate<ClusterNode> excludeLastPredicate() { - return new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { - return !e.id().equals(grid(GRID_CNT - 1).localNode().id()); - } - }; - } - - /** - * Constructor. - */ - public GridJobMasterLeaveAwareSelfTest() { - super(/* don't start grid */ false); - } - - /** - * Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked on job which is initiated by - * master and is currently running on it. - * - * @throws Exception If failed. - */ - public void testLocalJobOnMaster() throws Exception { - invokeLatch = new CountDownLatch(1); - jobLatch = new CountDownLatch(1); - - Ignite g = startGrid(0); - - g.compute().enableAsync().execute(new TestTask(1), null); - - jobLatch.await(); - - // Count down the latch in a separate thread. - new Thread(new Runnable() { - @Override public void run() { - try { - U.sleep(500); - } - catch (IgniteInterruptedException ignore) { - // No-op. - } - - latch.countDown(); - } - }).start(); - - stopGrid(0, true); - - latch.countDown(); - - assert invokeLatch.await(5000, MILLISECONDS); - } - - /** - * Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked when master node leaves topology normally. - * - * @throws Exception If failed. - */ - public void testMasterStoppedNormally() throws Exception { - // Start grids. - for (int i = 0; i < GRID_CNT; i++) - startGrid(i); - - int lastGridIdx = GRID_CNT - 1; - - compute(grid(lastGridIdx).forPredicate(excludeLastPredicate())).enableAsync(). - execute(new TestTask(GRID_CNT - 1), null); - - jobLatch.await(); - - stopGrid(lastGridIdx, true); - - latch.countDown(); - - assert invokeLatch.await(5000, MILLISECONDS); - } - - /** - * Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked when master node leaves topology - * abruptly (e.g. due to a network failure or immediate node shutdown). - * - * @throws Exception If failed. - */ - public void testMasterStoppedAbruptly() throws Exception { - // Start grids. - for (int i = 0; i < GRID_CNT; i++) - startGrid(i); - - int lastGridIdx = GRID_CNT - 1; - - compute(grid(lastGridIdx).forPredicate(excludeLastPredicate())).enableAsync(). - execute(new TestTask(GRID_CNT - 1), null); - - jobLatch.await(); - - ((CommunicationSpi)grid(lastGridIdx).configuration().getCommunicationSpi()).blockMessages(); - - stopGrid(lastGridIdx, true); - - latch.countDown(); - - assert invokeLatch.await(5000, MILLISECONDS); - } - - /** - * Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked when fails to send - * {@link GridJobExecuteResponse} to master node. - * - * @throws Exception If failed. - */ - public void testCannotSendJobExecuteResponse() throws Exception { - awaitMasterLeaveCallback = false; - - // Start grids. - for (int i = 0; i < GRID_CNT; i++) - startGrid(i); - - int lastGridIdx = GRID_CNT - 1; - - compute(grid(lastGridIdx).forPredicate(excludeLastPredicate())).enableAsync(). - execute(new TestTask(GRID_CNT - 1), null); - - jobLatch.await(); - - for (int i = 0; i < lastGridIdx; i++) - ((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).waitLatch(); - - latch.countDown(); - - // Ensure that all worker nodes has already started job response sending. - for (int i = 0; i < lastGridIdx; i++) - ((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).awaitResponse(); - - // Now we stop master grid. - stopGrid(lastGridIdx, true); - - // Release communication SPI wait latches. As master node is stopped, job worker will receive and exception. - for (int i = 0; i < lastGridIdx; i++) - ((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).releaseWaitLatch(); - - assert invokeLatch.await(5000, MILLISECONDS); - } - - /** - * @throws Exception If failed. - */ - public void testApply1() throws Exception { - testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { - @Override public IgniteFuture<?> applyx(ClusterGroup grid) throws IgniteCheckedException { - IgniteCompute comp = compute(grid).enableAsync(); - - comp.apply(new TestClosure(), "arg"); - - return comp.future(); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testApply2() throws Exception { - testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() { - @Override public IgniteFuture<?> applyx(ClusterGroup grid) throws IgniteCheckedException { - IgniteCompute comp = compute(grid).enableAsync(); - - comp.apply(new TestClosure(), Arrays.asList("arg1", "arg2")); - - return comp.future(); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testApply3() throws Exception { - testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() { - @Override public IgniteFuture<?> applyx(ClusterGroup grid) throws IgniteCheckedException { - IgniteCompute comp = compute(grid).enableAsync(); - - comp.apply(new TestClosure(), - Arrays.asList("arg1", "arg2"), - new IgniteReducer<Void, Object>() { - @Override public boolean collect(@Nullable Void aVoid) { - return true; - } - - @Override public Object reduce() { - return null; - } - }); - - return comp.future(); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testRun1() throws Exception { - testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { - @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { - IgniteCompute comp = compute(prj).enableAsync(); - - comp.run(new TestRunnable()); - - return comp.future(); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testRun2() throws Exception { - testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() { - @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { - IgniteCompute comp = compute(prj).enableAsync(); - - comp.run(Arrays.asList(new TestRunnable(), new TestRunnable())); - - return comp.future(); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testCall1() throws Exception { - testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { - @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { - IgniteCompute comp = compute(prj).enableAsync(); - - comp.call(new TestCallable()); - - return comp.future(); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testCall2() throws Exception { - testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() { - @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { - IgniteCompute comp = compute(prj).enableAsync(); - - comp.call(Arrays.asList(new TestCallable(), new TestCallable())); - - return comp.future(); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testCall3() throws Exception { - testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() { - @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { - IgniteCompute comp = compute(prj).enableAsync(); - - comp.call( - Arrays.asList(new TestCallable(), new TestCallable()), - new IgniteReducer<Void, Object>() { - @Override public boolean collect(@Nullable Void aVoid) { - return true; - } - - @Override public Object reduce() { - return null; - } - }); - - return comp.future(); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testBroadcast1() throws Exception { - testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { - @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { - IgniteCompute comp = compute(prj).enableAsync(); - - comp.broadcast(new TestRunnable()); - - return comp.future(); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testBroadcast2() throws Exception { - testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { - @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { - IgniteCompute comp = compute(prj).enableAsync(); - - comp.broadcast(new TestCallable()); - - return comp.future(); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testBroadcast3() throws Exception { - testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { - @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { - IgniteCompute comp = compute(prj).enableAsync(); - - comp.broadcast(new TestClosure(), "arg"); - - return comp.future(); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testAffinityRun() throws Exception { - testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { - @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { - IgniteCompute comp = compute(prj).enableAsync(); - - GridCacheAffinity<Object> aff = prj.ignite().cache(null).affinity(); - - ClusterNode node = F.first(prj.nodes()); - - comp.affinityRun(null, keyForNode(aff, node), new TestRunnable()); - - return comp.future(); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testAffinityCall() throws Exception { - testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() { - @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws IgniteCheckedException { - IgniteCompute comp = compute(prj).enableAsync(); - - GridCacheAffinity<Object> aff = prj.ignite().cache(null).affinity(); - - ClusterNode node = F.first(prj.nodes()); - - comp.affinityCall(null, keyForNode(aff, node), new TestCallable()); - - return comp.future(); - } - }); - } - - /** - * @param aff Cache affinity. - * @param node Node. - * @return Finds some cache key for which given node is primary. - */ - private Object keyForNode(GridCacheAffinity<Object> aff, ClusterNode node) { - assertNotNull(node); - - Object key = null; - - for (int i = 0; i < 1000; i++) { - if (aff.isPrimary(node, i)) { - key = i; - - break; - } - } - - assertNotNull(key); - - return key; - } - - /** - * @param expJobs Expected jobs number. - * @param taskStarter Task started. - * @throws Exception If failed. - */ - private void testMasterLeaveAwareCallback(int expJobs, IgniteClosure<ClusterGroup, IgniteFuture<?>> taskStarter) - throws Exception { - jobLatch = new CountDownLatch(expJobs); - invokeLatch = new CountDownLatch(expJobs); - - for (int i = 0; i < GRID_CNT; i++) - startGrid(i); - - int lastGridIdx = GRID_CNT - 1; - - IgniteFuture<?> fut = taskStarter.apply(grid(lastGridIdx).forPredicate(excludeLastPredicate())); - - jobLatch.await(); - - stopGrid(lastGridIdx, true); - - latch.countDown(); - - assert invokeLatch.await(5000, MILLISECONDS); - - try { - fut.get(); - } - catch (IgniteCheckedException e) { - log.debug("Task failed: " + e); - } - } - - /** - */ - private static class TestMasterLeaveAware { - /** */ - private final CountDownLatch latch0 = new CountDownLatch(1); - - /** - * @param log Logger. - */ - private void execute(IgniteLogger log) { - try { - log.info("Started execute."); - - // Countdown shared job latch so that the main thread know that all jobs are - // inside the "execute" routine. - jobLatch.countDown(); - - log.info("After job latch."); - - // Await for the main thread to allow jobs to proceed. - latch.await(); - - log.info("After latch."); - - if (awaitMasterLeaveCallback) { - latch0.await(); - - log.info("After latch0."); - } - else - log.info("Latch 0 skipped."); - } - catch (InterruptedException e) { - // We do not expect any interruptions here, hence this statement. - fail("Unexpected exception: " + e); - } - } - - /** - * @param log Logger. - * @param job Actual job. - */ - private void onMasterLeave(IgniteLogger log, Object job) { - log.info("Callback executed: " + job); - - latch0.countDown(); - - invokeLatch.countDown(); - } - } - - /** - * Master leave aware callable. - */ - private static class TestCallable implements Callable<Void>, ComputeJobMasterLeaveAware { - /** Task session. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** */ - private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware(); - - /** {@inheritDoc} */ - @Override public Void call() throws Exception { - masterLeaveAware.execute(log); - - return null; - } - - /** {@inheritDoc} */ - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - masterLeaveAware.onMasterLeave(log, this); - } - } - - /** - * Master leave aware runnable. - */ - private static class TestRunnable implements Runnable, ComputeJobMasterLeaveAware { - /** Task session. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** */ - private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware(); - - /** {@inheritDoc} */ - @Override public void run() { - masterLeaveAware.execute(log); - } - - /** {@inheritDoc} */ - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - masterLeaveAware.onMasterLeave(log, this); - } - } - - /** - * Master leave aware closure. - */ - private static class TestClosure implements IgniteClosure<String, Void>, ComputeJobMasterLeaveAware { - /** Task session. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** */ - private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware(); - - /** {@inheritDoc} */ - @Override public Void apply(String arg) { - masterLeaveAware.execute(log); - - return null; - } - - /** {@inheritDoc} */ - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - masterLeaveAware.onMasterLeave(log, this); - } - } - - /** - * Base implementation of dummy task which produces predefined amount of test jobs on split stage. - */ - private static class TestTask extends ComputeTaskSplitAdapter<String, Integer> { - /** How many jobs to produce. */ - private int jobCnt; - - /** */ - @IgniteTaskSessionResource - private ComputeTaskSession taskSes; - - /** - * Constructor. - * - * @param jobCnt How many jobs to produce on split stage. - */ - private TestTask(int jobCnt) { - this.jobCnt = jobCnt; - } - - /** {@inheritDoc} */ - @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteCheckedException { - Collection<ComputeJobAdapter> jobs = new ArrayList<>(jobCnt); - - for (int i = 0; i < jobCnt; i++) - jobs.add(new TestJob()); - - return jobs; - } - - /** {@inheritDoc} */ - @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - return null; - } - } - - /** - * Base implementation of dummy test job. - */ - private static class TestJob extends ComputeJobAdapter implements ComputeJobMasterLeaveAware { - /** Task session. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** */ - private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware(); - - /** - * Constructor - */ - private TestJob() { - super(new Object()); - } - - /** {@inheritDoc} */ - @Override public Object execute() throws IgniteCheckedException { - masterLeaveAware.execute(log); - - return null; - } - - /** {@inheritDoc} */ - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - masterLeaveAware.onMasterLeave(log, this); - } - } - - /** - * Communication SPI which could optionally block outgoing messages. - */ - private static class CommunicationSpi extends TcpCommunicationSpi { - /** Whether to block all outgoing messages. */ - private volatile boolean block; - - /** Job execution response latch. */ - private CountDownLatch respLatch = new CountDownLatch(1); - - /** Whether to wait for a wait latch before returning. */ - private volatile boolean wait; - - /** Wait latch. */ - private CountDownLatch waitLatch = new CountDownLatch(1); - - /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg) - throws IgniteSpiException { - sendMessage0(node, msg); - } - - /** - * Send message optionally either blocking it or throwing an exception if it is of - * {@link GridJobExecuteResponse} type. - * - * @param node Destination node. - * @param msg Message to be sent. - * @throws org.apache.ignite.spi.IgniteSpiException If failed. - */ - private void sendMessage0(ClusterNode node, GridTcpCommunicationMessageAdapter msg) throws IgniteSpiException { - if (msg instanceof GridIoMessage) { - GridIoMessage msg0 = (GridIoMessage)msg; - - if (msg0.message() instanceof GridJobExecuteResponse) { - respLatch.countDown(); - - if (wait) { - try { - U.await(waitLatch); - } - catch (IgniteInterruptedException ignore) { - // No-op. - } - } - } - } - - if (!block) - super.sendMessage(node, msg); - } - - /** - * Block all outgoing message. - */ - void blockMessages() { - block = true; - } - - /** - * Whether to block on a wait latch. - */ - private void waitLatch() { - wait = true; - } - - /** - * Count down wait latch. - */ - private void releaseWaitLatch() { - waitLatch.countDown(); - } - - /** - * Await for job execution response to come. - * - * @throws org.apache.ignite.IgniteInterruptedException If interrupted. - */ - private void awaitResponse() throws IgniteInterruptedException { - U.await(respLatch); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobStealingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobStealingSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobStealingSelfTest.java deleted file mode 100644 index 151320a..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobStealingSelfTest.java +++ /dev/null @@ -1,439 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.collision.jobstealing.*; -import org.apache.ignite.spi.failover.jobstealing.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.config.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -/** - * Job stealing test. - */ -@SuppressWarnings("unchecked") -@GridCommonTest(group = "Kernal Self") -public class GridJobStealingSelfTest extends GridCommonAbstractTest { - /** Task execution timeout in milliseconds. */ - private static final int TASK_EXEC_TIMEOUT_MS = 50000; - - /** */ - private Ignite ignite1; - - /** */ - private Ignite ignite2; - - /** Job distribution map. Records which job has run on which node. */ - private static Map<UUID, Collection<ComputeJob>> jobDistrMap = new HashMap<>(); - - /** */ - public GridJobStealingSelfTest() { - super(false /* don't start grid*/); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - jobDistrMap.clear(); - - ignite1 = startGrid(1); - - ignite2 = startGrid(2); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - - ignite1 = null; - ignite2 = null; - } - - /** - * Test 2 jobs on 1 node. - * - * @throws IgniteCheckedException If test failed. - */ - public void testTwoJobs() throws IgniteCheckedException { - executeAsync(ignite1.compute(), new JobStealingSingleNodeTask(2), null).get(TASK_EXEC_TIMEOUT_MS); - - // Verify that 1 job was stolen by second node. - assertEquals(2, jobDistrMap.keySet().size()); - assertEquals(1, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); - assertEquals(1, jobDistrMap.get(ignite2.cluster().localNode().id()).size()); - } - - /** - * Test 2 jobs on 1 node with null predicate. - * - * @throws IgniteCheckedException If test failed. - */ - @SuppressWarnings("NullArgumentToVariableArgMethod") - public void testTwoJobsNullPredicate() throws IgniteCheckedException { - executeAsync(ignite1.compute(), new JobStealingSingleNodeTask(2), null).get(TASK_EXEC_TIMEOUT_MS); - - // Verify that 1 job was stolen by second node. - assertEquals(2, jobDistrMap.keySet().size()); - assertEquals(1, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); - assertEquals(1, jobDistrMap.get(ignite2.cluster().localNode().id()).size()); - } - - /** - * Test 2 jobs on 1 node with null predicate using string task name. - * - * @throws IgniteCheckedException If test failed. - */ - @SuppressWarnings("NullArgumentToVariableArgMethod") - public void testTwoJobsTaskNameNullPredicate() throws IgniteCheckedException { - executeAsync(ignite1.compute(), JobStealingSingleNodeTask.class.getName(), null).get(TASK_EXEC_TIMEOUT_MS); - - // Verify that 1 job was stolen by second node. - assertEquals(2, jobDistrMap.keySet().size()); - assertEquals(1, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); - assertEquals(1, jobDistrMap.get(ignite2.cluster().localNode().id()).size()); - } - - /** - * Test 2 jobs on 1 node when one of the predicates is null. - * - * @throws IgniteCheckedException If test failed. - */ - @SuppressWarnings("unchecked") - public void testTwoJobsPartiallyNullPredicate() throws IgniteCheckedException { - IgnitePredicate<ClusterNode> topPred = new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { - return ignite2.cluster().localNode().id().equals(e.id()); // Limit projection with only grid2. - } - }; - - executeAsync(compute(ignite1.cluster().forPredicate(topPred)).withTimeout(TASK_EXEC_TIMEOUT_MS), - new JobStealingSpreadTask(2), null).get(TASK_EXEC_TIMEOUT_MS); - - assertEquals(1, jobDistrMap.keySet().size()); - assertEquals(2, jobDistrMap.get(ignite2.cluster().localNode().id()).size()); - assertFalse(jobDistrMap.containsKey(ignite1.cluster().localNode().id())); - } - - /** - * Tests that projection predicate is taken into account by Stealing SPI. - * - * @throws Exception If failed. - */ - public void testProjectionPredicate() throws Exception { - final Ignite ignite3 = startGrid(3); - - executeAsync(compute(ignite1.cluster().forPredicate(new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { - return ignite1.cluster().localNode().id().equals(e.id()) || - ignite3.cluster().localNode().id().equals(e.id()); // Limit projection with only grid1 or grid3 node. - } - })), new JobStealingSpreadTask(4), null).get(TASK_EXEC_TIMEOUT_MS); - - // Verify that jobs were run only on grid1 and grid3 (not on grid2) - assertEquals(2, jobDistrMap.keySet().size()); - assertEquals(2, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); - assertEquals(2, jobDistrMap.get(ignite3.cluster().localNode().id()).size()); - assertFalse(jobDistrMap.containsKey(ignite2.cluster().localNode().id())); - } - - /** - * Tests that projection predicate is taken into account by Stealing SPI, - * and that jobs in projection can steal tasks from each other. - * - * @throws Exception If failed. - */ - public void testProjectionPredicateInternalStealing() throws Exception { - final Ignite ignite3 = startGrid(3); - - IgnitePredicate<ClusterNode> p = new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { - return ignite1.cluster().localNode().id().equals(e.id()) || - ignite3.cluster().localNode().id().equals(e.id()); // Limit projection with only grid1 or grid3 node. - } - }; - - executeAsync(compute(ignite1.cluster().forPredicate(p)), new JobStealingSingleNodeTask(4), null).get(TASK_EXEC_TIMEOUT_MS); - - // Verify that jobs were run only on grid1 and grid3 (not on grid2) - assertEquals(2, jobDistrMap.keySet().size()); - assertFalse(jobDistrMap.containsKey(ignite2.cluster().localNode().id())); - } - - /** - * Tests that a job is not cancelled if there are no - * available thief nodes in topology. - * - * @throws Exception If failed. - */ - public void testSingleNodeTopology() throws Exception { - IgnitePredicate<ClusterNode> p = new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { - return ignite1.cluster().localNode().id().equals(e.id()); // Limit projection with only grid1 node. - } - }; - - executeAsync(compute(ignite1.cluster().forPredicate(p)), new JobStealingSpreadTask(2), null). - get(TASK_EXEC_TIMEOUT_MS); - - assertEquals(1, jobDistrMap.keySet().size()); - assertEquals(2, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); - } - - /** - * Tests that a job is not cancelled if there are no - * available thief nodes in projection. - * - * @throws Exception If failed. - */ - public void testSingleNodeProjection() throws Exception { - ClusterGroup prj = ignite1.cluster().forNodeIds(Collections.singleton(ignite1.cluster().localNode().id())); - - executeAsync(compute(prj), new JobStealingSpreadTask(2), null).get(TASK_EXEC_TIMEOUT_MS); - - assertEquals(1, jobDistrMap.keySet().size()); - assertEquals(2, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); - } - - /** - * Tests that a job is not cancelled if there are no - * available thief nodes in projection. Uses null predicate. - * - * @throws Exception If failed. - */ - @SuppressWarnings("NullArgumentToVariableArgMethod") - public void testSingleNodeProjectionNullPredicate() throws Exception { - ClusterGroup prj = ignite1.cluster().forNodeIds(Collections.singleton(ignite1.cluster().localNode().id())); - - executeAsync(compute(prj).withTimeout(TASK_EXEC_TIMEOUT_MS), new JobStealingSpreadTask(2), null). - get(TASK_EXEC_TIMEOUT_MS); - - assertEquals(1, jobDistrMap.keySet().size()); - assertEquals(2, jobDistrMap.get(ignite1.cluster().localNode().id()).size()); - } - - /** - * Tests job stealing with peer deployment and different class loaders. - * - * @throws Exception If failed. - */ - @SuppressWarnings("unchecked") - public void testProjectionPredicateDifferentClassLoaders() throws Exception { - final Ignite ignite3 = startGrid(3); - - URL[] clsLdrUrls; - try { - clsLdrUrls = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))}; - } - catch (MalformedURLException e) { - throw new RuntimeException("Define property p2p.uri.cls", e); - } - - ClassLoader ldr1 = new URLClassLoader(clsLdrUrls, getClass().getClassLoader()); - - Class taskCls = ldr1.loadClass("org.gridgain.grid.tests.p2p.JobStealingTask"); - Class nodeFilterCls = ldr1.loadClass("org.gridgain.grid.tests.p2p.GridExcludeNodeFilter"); - - IgnitePredicate<ClusterNode> nodeFilter = (IgnitePredicate<ClusterNode>)nodeFilterCls - .getConstructor(UUID.class).newInstance(ignite2.cluster().localNode().id()); - - Map<UUID, Integer> ret = (Map<UUID, Integer>)executeAsync(compute(ignite1.cluster().forPredicate(nodeFilter)), - taskCls, null).get(TASK_EXEC_TIMEOUT_MS); - - assert ret != null; - assert ret.get(ignite1.cluster().localNode().id()) != null && ret.get(ignite1.cluster().localNode().id()) == 2 : - ret.get(ignite1.cluster().localNode().id()); - assert ret.get(ignite3.cluster().localNode().id()) != null && ret.get(ignite3.cluster().localNode().id()) == 2 : - ret.get(ignite3.cluster().localNode().id()); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - JobStealingCollisionSpi colSpi = new JobStealingCollisionSpi(); - - // One job at a time. - colSpi.setActiveJobsThreshold(1); - colSpi.setWaitJobsThreshold(0); - - JobStealingFailoverSpi failSpi = new JobStealingFailoverSpi(); - - // Verify defaults. - assert failSpi.getMaximumFailoverAttempts() == JobStealingFailoverSpi.DFLT_MAX_FAILOVER_ATTEMPTS; - - cfg.setCollisionSpi(colSpi); - cfg.setFailoverSpi(failSpi); - - cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); - - return cfg; - } - - /** - * Job stealing task, that spreads jobs equally over the grid. - */ - private static class JobStealingSpreadTask extends ComputeTaskAdapter<Object, Object> { - /** Grid. */ - @IgniteInstanceResource - private Ignite ignite; - - /** Logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** Number of jobs to spawn from task. */ - protected final int nJobs; - - /** - * Constructs a new task instance. - * - * @param nJobs Number of jobs to spawn from this task. - */ - JobStealingSpreadTask(int nJobs) { - this.nJobs = nJobs; - } - - /** {@inheritDoc} */ - @SuppressWarnings("ForLoopReplaceableByForEach") - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable Object arg) throws IgniteCheckedException { - //assert subgrid.size() == 2 : "Invalid subgrid size: " + subgrid.size(); - - Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size()); - - Iterator<ClusterNode> subIter = subgrid.iterator(); - - // Spread jobs over subgrid. - for (int i = 0; i < nJobs; i++) { - if (!subIter.hasNext()) - subIter = subgrid.iterator(); // wrap around - - map.put(new GridJobStealingJob(5000L), subIter.next()); - } - - return map; - } - - /** {@inheritDoc} */ - @SuppressWarnings("SuspiciousMethodCalls") - @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - for (ComputeJobResult res : results) { - log.info("Job result: " + res.getData()); - } - - return null; - } - } - - /** - * Job stealing task, that puts all jobs onto one node. - */ - private static class JobStealingSingleNodeTask extends JobStealingSpreadTask { - /** {@inheritDoc} */ - JobStealingSingleNodeTask(int nJobs) { - super(nJobs); - } - - /** - * Default constructor. - * - * Uses 2 jobs. - */ - JobStealingSingleNodeTask() { - super(2); - } - - /** {@inheritDoc} */ - @SuppressWarnings("ForLoopReplaceableByForEach") - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable Object arg) throws IgniteCheckedException { - assert subgrid.size() > 1 : "Invalid subgrid size: " + subgrid.size(); - - Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size()); - - // Put all jobs onto one node. - for (int i = 0; i < nJobs; i++) - map.put(new GridJobStealingJob(5000L), subgrid.get(0)); - - return map; - } - } - - /** - * Job stealing job. - */ - private static final class GridJobStealingJob extends ComputeJobAdapter { - /** Injected grid. */ - @IgniteInstanceResource - private Ignite ignite; - - /** Logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** - * @param arg Job argument. - */ - GridJobStealingJob(Long arg) { - super(arg); - } - - /** {@inheritDoc} */ - @Override public Serializable execute() throws IgniteCheckedException { - log.info("Started job on node: " + ignite.cluster().localNode().id()); - - if (!jobDistrMap.containsKey(ignite.cluster().localNode().id())) { - Collection<ComputeJob> jobs = new ArrayList<>(); - jobs.add(this); - - jobDistrMap.put(ignite.cluster().localNode().id(), jobs); - } - else - jobDistrMap.get(ignite.cluster().localNode().id()).add(this); - - try { - Long sleep = argument(0); - - assert sleep != null; - - Thread.sleep(sleep); - } - catch (InterruptedException e) { - log.info("Job got interrupted on node: " + ignite.cluster().localNode().id()); - - throw new IgniteCheckedException("Job got interrupted.", e); - } - finally { - log.info("Job finished on node: " + ignite.cluster().localNode().id()); - } - - return ignite.cluster().localNode().id(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobStealingZeroActiveJobsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobStealingZeroActiveJobsSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobStealingZeroActiveJobsSelfTest.java deleted file mode 100644 index f39ce32..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobStealingZeroActiveJobsSelfTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.collision.jobstealing.*; -import org.apache.ignite.spi.failover.jobstealing.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Job stealing test. - */ -@GridCommonTest(group = "Kernal Self") -public class GridJobStealingZeroActiveJobsSelfTest extends GridCommonAbstractTest { - /** */ - private static Ignite ignite1; - - /** */ - private static Ignite ignite2; - - /** */ - public GridJobStealingZeroActiveJobsSelfTest() { - super(false /* don't start grid*/); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - ignite1 = startGrid(1); - ignite2 = startGrid(2); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - ignite1 = null; - - stopGrid(1); - stopGrid(2); - } - - /** - * Test 2 jobs on 2 nodes. - * - * @throws IgniteCheckedException If test failed. - */ - public void testTwoJobs() throws IgniteCheckedException { - ignite1.compute().execute(JobStealingTask.class, null); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - JobStealingCollisionSpi colSpi = new JobStealingCollisionSpi(); - - // One job at a time. - colSpi.setActiveJobsThreshold(gridName.endsWith("1") ? 0 : 2); - colSpi.setWaitJobsThreshold(0); - - JobStealingFailoverSpi failSpi = new JobStealingFailoverSpi(); - - // Verify defaults. - assert failSpi.getMaximumFailoverAttempts() == JobStealingFailoverSpi.DFLT_MAX_FAILOVER_ATTEMPTS; - - cfg.setCollisionSpi(colSpi); - cfg.setFailoverSpi(failSpi); - - return cfg; - } - - /** */ - @SuppressWarnings({"PublicInnerClass"}) - public static class JobStealingTask extends ComputeTaskAdapter<Object, Object> { - /** Grid. */ - @IgniteInstanceResource - private Ignite ignite; - - /** Logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Object arg) throws IgniteCheckedException { - assert subgrid.size() == 2 : "Invalid subgrid size: " + subgrid.size(); - - Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size()); - - // Put all jobs onto local node. - for (Iterator iter = subgrid.iterator(); iter.hasNext(); iter.next()) - map.put(new GridJobStealingJob(5000L), ignite.cluster().localNode()); - - return map; - } - - /** {@inheritDoc} */ - @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - assert results.size() == 2; - - for (ComputeJobResult res : results) { - log.info("Job result: " + res.getData()); - } - - String name1 = results.get(0).getData(); - String name2 = results.get(1).getData(); - - assert name1.equals(name2); - - assert !name1.equals(ignite1.name()); - assert name1.equals(ignite2.name()); - - return null; - } - } - - /** - * - */ - @SuppressWarnings({"PublicInnerClass"}) - public static final class GridJobStealingJob extends ComputeJobAdapter { - /** Injected grid. */ - @IgniteInstanceResource - private Ignite ignite; - - /** - * @param arg Job argument. - */ - GridJobStealingJob(Long arg) { - super(arg); - } - - /** {@inheritDoc} */ - @Override public Serializable execute() throws IgniteCheckedException { - try { - Long sleep = argument(0); - - assert sleep != null; - - Thread.sleep(sleep); - } - catch (InterruptedException e) { - throw new IgniteCheckedException("Job got interrupted.", e); - } - - return ignite.name(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobSubjectIdSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobSubjectIdSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobSubjectIdSelfTest.java deleted file mode 100644 index d5d55eb..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridJobSubjectIdSelfTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Test job subject ID propagation. - */ -public class GridJobSubjectIdSelfTest extends GridCommonAbstractTest { - /** Job subject ID. */ - private static volatile UUID taskSubjId; - - /** Job subject ID. */ - private static volatile UUID jobSubjId; - - /** Event subject ID. */ - private static volatile UUID evtSubjId; - - /** First node. */ - private Ignite node1; - - /** Second node. */ - private Ignite node2; - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - node1 = startGrid(1); - node2 = startGrid(2); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - - node1 = null; - node2 = null; - } - - /** - * Test job subject ID propagation. - * - * @throws Exception If failed. - */ - public void testJobSubjectId() throws Exception { - node2.events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - IgniteJobEvent evt0 = (IgniteJobEvent)evt; - - assert evtSubjId == null; - - evtSubjId = evt0.taskSubjectId(); - - return false; - } - }, IgniteEventType.EVT_JOB_STARTED); - - node1.compute().execute(new Task(node2.cluster().localNode().id()), null); - - assertEquals(taskSubjId, jobSubjId); - assertEquals(taskSubjId, evtSubjId); - } - - /** - * Task class. - */ - @SuppressWarnings("PublicInnerClass") - public static class Task extends ComputeTaskAdapter<Object, Object> { - /** Target node ID. */ - private UUID targetNodeId; - - /** Session. */ - @IgniteTaskSessionResource - private ComputeTaskSession ses; - - /** - * Constructor. - * - * @param targetNodeId Target node ID. - */ - public Task(UUID targetNodeId) { - this.targetNodeId = targetNodeId; - } - - /** {@inheritDoc} */ - @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable Object arg) throws IgniteCheckedException { - taskSubjId = ((GridTaskSessionInternal)ses).subjectId(); - - ClusterNode node = null; - - for (ClusterNode subgridNode : subgrid) { - if (F.eq(targetNodeId, subgridNode.id())) { - node = subgridNode; - - break; - } - } - - assert node != null; - - return Collections.singletonMap(new Job(), node); - } - - /** {@inheritDoc} */ - @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - return null; - } - } - - /** - * Job class. - */ - @SuppressWarnings("PublicInnerClass") - public static class Job extends ComputeJobAdapter { - /** Session. */ - @IgniteTaskSessionResource - private ComputeTaskSession ses; - - /** {@inheritDoc} */ - @Nullable @Override public Object execute() throws IgniteCheckedException { - jobSubjId = ((GridTaskSessionInternal)ses).subjectId(); - - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridKernalConcurrentAccessStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridKernalConcurrentAccessStopSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridKernalConcurrentAccessStopSelfTest.java deleted file mode 100644 index 3a8a7d2..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridKernalConcurrentAccessStopSelfTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.testframework.junits.common.*; - -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * Tests kernal stop while it is being accessed from asynchronous even listener. - */ -public class GridKernalConcurrentAccessStopSelfTest extends GridCommonAbstractTest { - /** Grid count. */ - private static final int GRIDS = 2; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - for (int i = 0; i < GRIDS; i++) - startGrid(i); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - for (int i = GRIDS; i-- >= 0;) - stopGrid(i); - } - - /** - * - */ - public void testConcurrentAccess() { - for (int i = 0; i < GRIDS; i++) { - grid(i).events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return true; - } - }, EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridKernalTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridKernalTestUtils.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridKernalTestUtils.java deleted file mode 100644 index f3d2b53..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridKernalTestUtils.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.Ignite; -import org.apache.ignite.internal.*; - -/** - * Test kernal utils. - */ -public class GridKernalTestUtils { - /** - * Ensures singleton. - */ - private GridKernalTestUtils() { - /* No-op. */ - } - - /** - * Returns context by grid. - * - * @param ignite Grid. - * @return Kernal context. - */ - public static GridKernalContext context(Ignite ignite) { - assert ignite != null; - - return ((GridKernal) ignite).context(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridLifecycleAwareSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridLifecycleAwareSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridLifecycleAwareSelfTest.java deleted file mode 100644 index 72a80fd..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridLifecycleAwareSelfTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lifecycle.*; -import org.apache.ignite.logger.java.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.plugin.segmentation.*; -import org.apache.ignite.client.ssl.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import javax.net.ssl.*; - -/** - * Test for {@link org.apache.ignite.lifecycle.LifecycleAware} support in {@link org.apache.ignite.configuration.IgniteConfiguration}. - */ -public class GridLifecycleAwareSelfTest extends GridAbstractLifecycleAwareSelfTest { - /** - */ - private static class TestClientMessageInterceptor extends TestLifecycleAware - implements ClientMessageInterceptor { - /** - */ - TestClientMessageInterceptor() { - super(null); - } - - /** {@inheritDoc} */ - @Nullable @Override public Object onReceive(@Nullable Object obj) { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public Object onSend(Object obj) { - return null; - } - } - - /** - */ - private static class TestSegmentationResolver extends TestLifecycleAware implements GridSegmentationResolver { - /** - */ - TestSegmentationResolver() { - super(null); - } - - /** {@inheritDoc} */ - @Override public boolean isValidSegment() throws IgniteCheckedException { - return true; - } - } - - /** - */ - private static class TestContextFactory extends TestLifecycleAware implements GridSslContextFactory { - /** - */ - TestContextFactory() { - super(null); - } - - /** {@inheritDoc} */ - @Override public SSLContext createSslContext() throws SSLException { - return null; - } - } - - /** - */ - private static class TestLifecycleBean extends TestLifecycleAware implements LifecycleBean { - /** - */ - TestLifecycleBean() { - super(null); - } - - /** {@inheritDoc} */ - @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException { - // No-op. - } - } - - /** - */ - private static class TestMarshaller extends IgniteOptimizedMarshaller implements LifecycleAware { - /** */ - private final TestLifecycleAware lifecycleAware = new TestLifecycleAware(null); - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - lifecycleAware.start(); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteCheckedException { - lifecycleAware.stop(); - } - - /** - * @return Lifecycle aware. - */ - TestLifecycleAware lifecycleAware() { - return lifecycleAware; - } - } - - /** - */ - private static class TestLogger extends IgniteJavaLogger implements LifecycleAware { - /** */ - private final TestLifecycleAware lifecycleAware = new TestLifecycleAware(null); - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - lifecycleAware.start(); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteCheckedException { - lifecycleAware.stop(); - } - - /** - * @return Lifecycle aware. - */ - TestLifecycleAware lifecycleAware() { - return lifecycleAware; - } - } - - /** {@inheritDoc} */ - @Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TestClientMessageInterceptor interceptor = new TestClientMessageInterceptor(); - - ClientConnectionConfiguration clientCfg = new ClientConnectionConfiguration(); - - clientCfg.setClientMessageInterceptor(interceptor); - - cfg.setClientConnectionConfiguration(clientCfg); - - lifecycleAwares.add(interceptor); - - TestSegmentationResolver segmentationRslvr = new TestSegmentationResolver(); - - cfg.setSegmentationResolvers(segmentationRslvr); - - lifecycleAwares.add(segmentationRslvr); - - TestContextFactory ctxFactory = new TestContextFactory(); - - clientCfg.setRestTcpSslContextFactory(ctxFactory); - - lifecycleAwares.add(ctxFactory); - - TestLifecycleBean lifecycleBean = new TestLifecycleBean(); - - cfg.setLifecycleBeans(lifecycleBean); - - lifecycleAwares.add(lifecycleBean); - - TestMarshaller marshaller = new TestMarshaller(); - - cfg.setMarshaller(marshaller); - - lifecycleAwares.add(marshaller.lifecycleAware()); - - TestLogger testLog = new TestLogger(); - - cfg.setGridLogger(testLog); - - lifecycleAwares.add(testLog.lifecycleAware()); - - return cfg; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridLifecycleBeanSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridLifecycleBeanSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridLifecycleBeanSelfTest.java deleted file mode 100644 index 57551f6..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridLifecycleBeanSelfTest.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lifecycle.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.lifecycle.LifecycleEventType.*; - -/** - * Lifecycle bean test. - */ -@GridCommonTest(group = "Kernal Self") -public class GridLifecycleBeanSelfTest extends GridCommonAbstractTest { - /** */ - private LifeCycleBaseBean bean; - - /** */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - c.setLifecycleBeans(bean); - - return c; - } - - /** - * @throws Exception If failed. - */ - public void testNoErrors() throws Exception { - bean = new LifeCycleBaseBean(); - - startGrid(); - - try { - assertEquals(IgniteState.STARTED, G.state(getTestGridName())); - - assertEquals(1, bean.count(BEFORE_GRID_START)); - assertEquals(1, bean.count(AFTER_GRID_START)); - assertEquals(0, bean.count(BEFORE_GRID_STOP)); - assertEquals(0, bean.count(AFTER_GRID_STOP)); - } - finally { - stopAllGrids(); - } - - - assertEquals(IgniteState.STOPPED, G.state(getTestGridName())); - - assertEquals(1, bean.count(BEFORE_GRID_START)); - assertEquals(1, bean.count(AFTER_GRID_START)); - assertEquals(1, bean.count(BEFORE_GRID_STOP)); - assertEquals(1, bean.count(AFTER_GRID_STOP)); - } - - /** - * @throws Exception If failed. - */ - public void testGridErrorBeforeStart() throws Exception { - checkBeforeStart(true); - } - - /** - * @throws Exception If failed. - */ - public void testOtherErrorBeforeStart() throws Exception { - checkBeforeStart(false); - } - - /** - * @throws Exception If failed. - */ - public void testGridErrorAfterStart() throws Exception { - checkAfterStart(true); - } - - /** - * @throws Exception If failed. - */ - public void testOtherErrorAfterStart() throws Exception { - checkAfterStart(false); - } - - /** - * @param gridErr Grid error flag. - * @throws Exception If failed. - */ - private void checkBeforeStart(boolean gridErr) throws Exception { - bean = new LifeCycleExceptionBean(BEFORE_GRID_START, gridErr); - - try { - startGrid(); - - assertTrue(false); // Should never get here. - } - catch (IgniteCheckedException expected) { - info("Got expected exception: " + expected); - - assertEquals(IgniteState.STOPPED, G.state(getTestGridName())); - } - finally { - stopAllGrids(); - } - - assertEquals(0, bean.count(BEFORE_GRID_START)); - assertEquals(0, bean.count(AFTER_GRID_START)); - assertEquals(0, bean.count(BEFORE_GRID_STOP)); - assertEquals(1, bean.count(AFTER_GRID_STOP)); - } - - /** - * @param gridErr Grid error flag. - * @throws Exception If failed. - */ - private void checkAfterStart(boolean gridErr) throws Exception { - bean = new LifeCycleExceptionBean(AFTER_GRID_START, gridErr); - - try { - startGrid(); - - assertTrue(false); // Should never get here. - } - catch (IgniteCheckedException expected) { - info("Got expected exception: " + expected); - - assertEquals(IgniteState.STOPPED, G.state(getTestGridName())); - } - finally { - stopAllGrids(); - } - - assertEquals(1, bean.count(BEFORE_GRID_START)); - assertEquals(0, bean.count(AFTER_GRID_START)); - assertEquals(1, bean.count(BEFORE_GRID_STOP)); - assertEquals(1, bean.count(AFTER_GRID_STOP)); - } - - /** - * @throws Exception If failed. - */ - public void testGridErrorBeforeStop() throws Exception { - checkOnStop(BEFORE_GRID_STOP, true); - - assertEquals(1, bean.count(BEFORE_GRID_START)); - assertEquals(1, bean.count(AFTER_GRID_START)); - assertEquals(0, bean.count(BEFORE_GRID_STOP)); - assertEquals(1, bean.count(AFTER_GRID_STOP)); - } - - /** - * @throws Exception If failed. - */ - public void testOtherErrorBeforeStop() throws Exception { - checkOnStop(BEFORE_GRID_STOP, false); - - assertEquals(1, bean.count(BEFORE_GRID_START)); - assertEquals(1, bean.count(AFTER_GRID_START)); - assertEquals(0, bean.count(BEFORE_GRID_STOP)); - assertEquals(1, bean.count(AFTER_GRID_STOP)); - } - - /** - * @throws Exception If failed. - */ - public void testGridErrorAfterStop() throws Exception { - checkOnStop(AFTER_GRID_STOP, true); - - assertEquals(1, bean.count(BEFORE_GRID_START)); - assertEquals(1, bean.count(AFTER_GRID_START)); - assertEquals(1, bean.count(BEFORE_GRID_STOP)); - assertEquals(0, bean.count(AFTER_GRID_STOP)); - } - - /** - * @throws Exception If failed. - */ - public void testOtherErrorAfterStop() throws Exception { - checkOnStop(AFTER_GRID_STOP, false); - - assertEquals(1, bean.count(BEFORE_GRID_START)); - assertEquals(1, bean.count(AFTER_GRID_START)); - assertEquals(1, bean.count(BEFORE_GRID_STOP)); - assertEquals(0, bean.count(AFTER_GRID_STOP)); - } - - /** - * @param evt Error event. - * @param gridErr Grid error flag. - * @throws Exception If failed. - */ - private void checkOnStop(LifecycleEventType evt, boolean gridErr) throws Exception { - bean = new LifeCycleExceptionBean(evt, gridErr); - - try { - startGrid(); - - assertEquals(IgniteState.STARTED, G.state(getTestGridName())); - } - catch (IgniteCheckedException ignore) { - assertTrue(false); - } - finally { - try { - stopAllGrids(); - - assertEquals(IgniteState.STOPPED, G.state(getTestGridName())); - } - catch (Exception ignore) { - assertTrue(false); - } - } - } - - /** - * - */ - private static class LifeCycleBaseBean implements LifecycleBean { - /** */ - private Map<LifecycleEventType, AtomicInteger> callsCntr = - new EnumMap<>(LifecycleEventType.class); - - /** - * - */ - private LifeCycleBaseBean() { - for (LifecycleEventType t : LifecycleEventType.values()) - callsCntr.put(t, new AtomicInteger()); - } - - /** {@inheritDoc} */ - @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException { - callsCntr.get(evt).incrementAndGet(); - } - - /** - * @param t Event type. - * @return Number of calls. - */ - public int count(LifecycleEventType t) { - return callsCntr.get(t).get(); - } - } - - /** - * - */ - private static class LifeCycleExceptionBean extends LifeCycleBaseBean { - /** */ - private LifecycleEventType errType; - - private boolean gridErr; - - /** - * @param errType type of event to throw error. - * @param gridErr {@code True} if {@link IgniteCheckedException}. - */ - private LifeCycleExceptionBean(LifecycleEventType errType, boolean gridErr) { - this.errType = errType; - this.gridErr = gridErr; - } - - /** {@inheritDoc} */ - @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException { - if (evt == errType) { - if (gridErr) - throw new IgniteCheckedException("Expected exception for event: " + evt) { - @Override public void printStackTrace(PrintStream s) { - // No-op. - } - - @Override public void printStackTrace(PrintWriter s) { - // No-op. - } - }; - else - throw new RuntimeException("Expected exception for event: " + evt) { - @Override public void printStackTrace(PrintStream s) { - // No-op. - } - - @Override public void printStackTrace(PrintWriter s) { - // No-op. - } - }; - } - - super.onLifecycleEvent(evt); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridListenActorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridListenActorSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridListenActorSelfTest.java deleted file mode 100644 index ae3b841..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridListenActorSelfTest.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.messaging.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Tests for {@link org.apache.ignite.messaging.MessagingListenActor}. - */ -public class GridListenActorSelfTest extends GridCommonAbstractTest { - /** */ - private static final int MSG_QTY = 10; - - /** */ - private static final int PING_PONG_STEPS = 10; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGrid(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopGrid(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override protected void afterTest() throws Exception { - ((GridKernal)grid()).context().io(). - removeMessageListener(GridTopic.TOPIC_COMM_USER.name()); - } - - /** - * - * @throws Exception Thrown if failed. - */ - public void testBasicFlow() throws Exception { - final AtomicInteger cnt = new AtomicInteger(0); - - grid().message().localListen(null, new MessagingListenActor<String>() { - @Override - public void receive(UUID uuid, String rcvMsg) { - if ("TEST".equals(rcvMsg)) { - cnt.incrementAndGet(); - - // "Exit" after 1st message. - // Should never receive any more messages. - stop(); - } else { - assert false : "Unknown message: " + rcvMsg; - - stop(); - } - } - }); - - grid().message().send(null, "TEST"); // This message we should receive. - - // Flood it. - for (int i = 0; i < 100; i++) - grid().message().send(null, "TEST"); // This message should be lost... - - Thread.sleep(2000); - - assert cnt.get() == 1 : "Count is " + cnt.get(); - } - - /** - * @throws Exception If failed. - */ - public void testImmediateStop() throws Exception { - doSendReceive(MSG_QTY, 1); - } - - /** - * @throws Exception If failed. - */ - public void testReceiveAll() throws Exception { - doSendReceive(MSG_QTY, MSG_QTY); - } - - /** - * Testing {@link org.apache.ignite.messaging.MessagingListenActor#respond(UUID, Object)} method. - * - * @throws Exception If failed. - */ - public void testRespondToRemote() throws Exception { - startGrid(1); - - try { - final ClusterNode rmt = grid(1).localNode(); - - grid().message().localListen(null, new MessagingListenActor<String>() { - @Override protected void receive(UUID nodeId, String rcvMsg) throws IgniteCheckedException { - System.out.println("Local node received message: '" + rcvMsg + "'"); - - respond(rmt.id(), "RESPONSE"); - } - }); - - final AtomicInteger cnt = new AtomicInteger(); - - // Response listener - grid(1).message().localListen(null, new MessagingListenActor<String>() { - @Override public void receive(UUID nodeId, String rcvMsg) { - if ("RESPONSE".equals(rcvMsg)) { - System.out.println("Remote node received message: '" + rcvMsg + "'"); - - cnt.incrementAndGet(); - } - } - }); - - grid().message().send(null, "REQUEST"); - - assert GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return cnt.intValue() == 1; - } - }, getTestTimeout()); - } - finally { - stopGrid(1); - } - } - - /** - * @throws Exception If failed. - */ - public void testPingPong() throws Exception { - final AtomicInteger pingCnt = new AtomicInteger(); - final AtomicInteger pongCnt = new AtomicInteger(); - - final CountDownLatch latch = new CountDownLatch(PING_PONG_STEPS); - - grid().message().localListen(null, new MessagingListenActor<String>() { - @Override - protected void receive(UUID nodeId, String rcvMsg) throws IgniteCheckedException { - System.out.println("Received message: '" + rcvMsg + "'"); - - if ("PING".equals(rcvMsg)) { - pingCnt.incrementAndGet(); - - respond("PONG"); - } else if ("PONG".equals(rcvMsg)) { - pongCnt.incrementAndGet(); - - latch.countDown(); - - if (latch.getCount() > 0) - respond("PING"); - else - stop(); - } - } - }); - - grid().message().send(null, "PING"); - - latch.await(); - - assert pingCnt.intValue() == PING_PONG_STEPS; - assert pongCnt.intValue() == PING_PONG_STEPS; - } - - /** - * @param snd Sent messages quantity. - * @param rcv Max quantity of received messages before listener is removed. - * @throws Exception IF failed. - */ - private void doSendReceive(int snd, final int rcv) throws Exception { - assert rcv > 0; - assert snd >= 0; - - final AtomicInteger cnt = new AtomicInteger(0); - - grid().message().localListen(null, new MessagingListenActor<String>() { - @Override - protected void receive(UUID nodeId, String rcvMsg) { - System.out.println(Thread.currentThread().getName() + "# Received message: '" + rcvMsg + "'"); - - cnt.incrementAndGet(); - - if (cnt.intValue() == rcv) { - System.out.println(Thread.currentThread().getName() + "Calling stop..."); - - stop(); - } else if (cnt.intValue() < rcv) - skip(); - else - assert false; - } - }); - - for (int i = 1; i <= snd; i++) { - String msg = "MESSAGE " + i; - - grid().message().send(null, msg); - - System.out.println(Thread.currentThread().getName() + "# Sent message: '" + msg + "'"); - } - - Thread.sleep(2000); - - assert cnt.intValue() == rcv; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridLocalEventListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridLocalEventListenerSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridLocalEventListenerSelfTest.java deleted file mode 100644 index bd36c2a..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridLocalEventListenerSelfTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Test ensuring that event listeners are picked by started node. - */ -public class GridLocalEventListenerSelfTest extends GridCommonAbstractTest { - /** Whether event fired. */ - private final CountDownLatch fired = new CountDownLatch(1); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - int idx = getTestGridIndex(gridName); - - if (idx == 0) { - Map<IgnitePredicate<? extends IgniteEvent>, int[]> lsnrs = new HashMap<>(); - - lsnrs.put(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - fired.countDown(); - - return true; - } - }, new int[] { IgniteEventType.EVT_NODE_JOINED } ); - - cfg.setLocalEventListeners(lsnrs); - } - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** - * Test listeners notification. - * - * @throws Exception If failed. - */ - public void testListener() throws Exception { - startGrids(2); - - assert fired.await(5000, TimeUnit.MILLISECONDS); - } -}