http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridJobSubjectIdSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobSubjectIdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobSubjectIdSelfTest.java new file mode 100644 index 0000000..d63056a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobSubjectIdSelfTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Test job subject ID propagation. + */ +public class GridJobSubjectIdSelfTest extends GridCommonAbstractTest { + /** Job subject ID. */ + private static volatile UUID taskSubjId; + + /** Job subject ID. */ + private static volatile UUID jobSubjId; + + /** Event subject ID. */ + private static volatile UUID evtSubjId; + + /** First node. */ + private Ignite node1; + + /** Second node. */ + private Ignite node2; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + node1 = startGrid(1); + node2 = startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + node1 = null; + node2 = null; + } + + /** + * Test job subject ID propagation. + * + * @throws Exception If failed. + */ + public void testJobSubjectId() throws Exception { + node2.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + IgniteJobEvent evt0 = (IgniteJobEvent)evt; + + assert evtSubjId == null; + + evtSubjId = evt0.taskSubjectId(); + + return false; + } + }, IgniteEventType.EVT_JOB_STARTED); + + node1.compute().execute(new Task(node2.cluster().localNode().id()), null); + + assertEquals(taskSubjId, jobSubjId); + assertEquals(taskSubjId, evtSubjId); + } + + /** + * Task class. + */ + @SuppressWarnings("PublicInnerClass") + public static class Task extends ComputeTaskAdapter<Object, Object> { + /** Target node ID. */ + private UUID targetNodeId; + + /** Session. */ + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + /** + * Constructor. + * + * @param targetNodeId Target node ID. + */ + public Task(UUID targetNodeId) { + this.targetNodeId = targetNodeId; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteCheckedException { + taskSubjId = ((GridTaskSessionInternal)ses).subjectId(); + + ClusterNode node = null; + + for (ClusterNode subgridNode : subgrid) { + if (F.eq(targetNodeId, subgridNode.id())) { + node = subgridNode; + + break; + } + } + + assert node != null; + + return Collections.singletonMap(new Job(), node); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } + + /** + * Job class. + */ + @SuppressWarnings("PublicInnerClass") + public static class Job extends ComputeJobAdapter { + /** Session. */ + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() throws IgniteCheckedException { + jobSubjId = ((GridTaskSessionInternal)ses).subjectId(); + + return null; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridKernalConcurrentAccessStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridKernalConcurrentAccessStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridKernalConcurrentAccessStopSelfTest.java new file mode 100644 index 0000000..8fbd9b1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridKernalConcurrentAccessStopSelfTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.junits.common.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Tests kernal stop while it is being accessed from asynchronous even listener. + */ +public class GridKernalConcurrentAccessStopSelfTest extends GridCommonAbstractTest { + /** Grid count. */ + private static final int GRIDS = 2; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + for (int i = 0; i < GRIDS; i++) + startGrid(i); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + for (int i = GRIDS; i-- >= 0;) + stopGrid(i); + } + + /** + * + */ + public void testConcurrentAccess() { + for (int i = 0; i < GRIDS; i++) { + grid(i).events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return true; + } + }, EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridKernalTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridKernalTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/GridKernalTestUtils.java new file mode 100644 index 0000000..6a473f9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridKernalTestUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.*; + +/** + * Test kernal utils. + */ +public class GridKernalTestUtils { + /** + * Ensures singleton. + */ + private GridKernalTestUtils() { + /* No-op. */ + } + + /** + * Returns context by grid. + * + * @param ignite Grid. + * @return Kernal context. + */ + public static GridKernalContext context(Ignite ignite) { + assert ignite != null; + + return ((GridKernal) ignite).context(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridLifecycleAwareSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridLifecycleAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridLifecycleAwareSelfTest.java new file mode 100644 index 0000000..5f6487f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridLifecycleAwareSelfTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.logger.java.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.plugin.segmentation.*; +import org.apache.ignite.client.ssl.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.net.ssl.*; + +/** + * Test for {@link org.apache.ignite.lifecycle.LifecycleAware} support in {@link org.apache.ignite.configuration.IgniteConfiguration}. + */ +public class GridLifecycleAwareSelfTest extends GridAbstractLifecycleAwareSelfTest { + /** + */ + private static class TestClientMessageInterceptor extends TestLifecycleAware + implements ClientMessageInterceptor { + /** + */ + TestClientMessageInterceptor() { + super(null); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object onReceive(@Nullable Object obj) { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object onSend(Object obj) { + return null; + } + } + + /** + */ + private static class TestSegmentationResolver extends TestLifecycleAware implements GridSegmentationResolver { + /** + */ + TestSegmentationResolver() { + super(null); + } + + /** {@inheritDoc} */ + @Override public boolean isValidSegment() throws IgniteCheckedException { + return true; + } + } + + /** + */ + private static class TestContextFactory extends TestLifecycleAware implements GridSslContextFactory { + /** + */ + TestContextFactory() { + super(null); + } + + /** {@inheritDoc} */ + @Override public SSLContext createSslContext() throws SSLException { + return null; + } + } + + /** + */ + private static class TestLifecycleBean extends TestLifecycleAware implements LifecycleBean { + /** + */ + TestLifecycleBean() { + super(null); + } + + /** {@inheritDoc} */ + @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException { + // No-op. + } + } + + /** + */ + private static class TestMarshaller extends IgniteOptimizedMarshaller implements LifecycleAware { + /** */ + private final TestLifecycleAware lifecycleAware = new TestLifecycleAware(null); + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + lifecycleAware.start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteCheckedException { + lifecycleAware.stop(); + } + + /** + * @return Lifecycle aware. + */ + TestLifecycleAware lifecycleAware() { + return lifecycleAware; + } + } + + /** + */ + private static class TestLogger extends IgniteJavaLogger implements LifecycleAware { + /** */ + private final TestLifecycleAware lifecycleAware = new TestLifecycleAware(null); + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + lifecycleAware.start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteCheckedException { + lifecycleAware.stop(); + } + + /** + * @return Lifecycle aware. + */ + TestLifecycleAware lifecycleAware() { + return lifecycleAware; + } + } + + /** {@inheritDoc} */ + @Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TestClientMessageInterceptor interceptor = new TestClientMessageInterceptor(); + + ClientConnectionConfiguration clientCfg = new ClientConnectionConfiguration(); + + clientCfg.setClientMessageInterceptor(interceptor); + + cfg.setClientConnectionConfiguration(clientCfg); + + lifecycleAwares.add(interceptor); + + TestSegmentationResolver segmentationRslvr = new TestSegmentationResolver(); + + cfg.setSegmentationResolvers(segmentationRslvr); + + lifecycleAwares.add(segmentationRslvr); + + TestContextFactory ctxFactory = new TestContextFactory(); + + clientCfg.setRestTcpSslContextFactory(ctxFactory); + + lifecycleAwares.add(ctxFactory); + + TestLifecycleBean lifecycleBean = new TestLifecycleBean(); + + cfg.setLifecycleBeans(lifecycleBean); + + lifecycleAwares.add(lifecycleBean); + + TestMarshaller marshaller = new TestMarshaller(); + + cfg.setMarshaller(marshaller); + + lifecycleAwares.add(marshaller.lifecycleAware()); + + TestLogger testLog = new TestLogger(); + + cfg.setGridLogger(testLog); + + lifecycleAwares.add(testLog.lifecycleAware()); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridLifecycleBeanSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridLifecycleBeanSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridLifecycleBeanSelfTest.java new file mode 100644 index 0000000..b177610 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridLifecycleBeanSelfTest.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.lifecycle.LifecycleEventType.*; + +/** + * Lifecycle bean test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridLifecycleBeanSelfTest extends GridCommonAbstractTest { + /** */ + private LifeCycleBaseBean bean; + + /** */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setLifecycleBeans(bean); + + return c; + } + + /** + * @throws Exception If failed. + */ + public void testNoErrors() throws Exception { + bean = new LifeCycleBaseBean(); + + startGrid(); + + try { + assertEquals(IgniteState.STARTED, G.state(getTestGridName())); + + assertEquals(1, bean.count(BEFORE_GRID_START)); + assertEquals(1, bean.count(AFTER_GRID_START)); + assertEquals(0, bean.count(BEFORE_GRID_STOP)); + assertEquals(0, bean.count(AFTER_GRID_STOP)); + } + finally { + stopAllGrids(); + } + + + assertEquals(IgniteState.STOPPED, G.state(getTestGridName())); + + assertEquals(1, bean.count(BEFORE_GRID_START)); + assertEquals(1, bean.count(AFTER_GRID_START)); + assertEquals(1, bean.count(BEFORE_GRID_STOP)); + assertEquals(1, bean.count(AFTER_GRID_STOP)); + } + + /** + * @throws Exception If failed. + */ + public void testGridErrorBeforeStart() throws Exception { + checkBeforeStart(true); + } + + /** + * @throws Exception If failed. + */ + public void testOtherErrorBeforeStart() throws Exception { + checkBeforeStart(false); + } + + /** + * @throws Exception If failed. + */ + public void testGridErrorAfterStart() throws Exception { + checkAfterStart(true); + } + + /** + * @throws Exception If failed. + */ + public void testOtherErrorAfterStart() throws Exception { + checkAfterStart(false); + } + + /** + * @param gridErr Grid error flag. + * @throws Exception If failed. + */ + private void checkBeforeStart(boolean gridErr) throws Exception { + bean = new LifeCycleExceptionBean(BEFORE_GRID_START, gridErr); + + try { + startGrid(); + + assertTrue(false); // Should never get here. + } + catch (IgniteCheckedException expected) { + info("Got expected exception: " + expected); + + assertEquals(IgniteState.STOPPED, G.state(getTestGridName())); + } + finally { + stopAllGrids(); + } + + assertEquals(0, bean.count(BEFORE_GRID_START)); + assertEquals(0, bean.count(AFTER_GRID_START)); + assertEquals(0, bean.count(BEFORE_GRID_STOP)); + assertEquals(1, bean.count(AFTER_GRID_STOP)); + } + + /** + * @param gridErr Grid error flag. + * @throws Exception If failed. + */ + private void checkAfterStart(boolean gridErr) throws Exception { + bean = new LifeCycleExceptionBean(AFTER_GRID_START, gridErr); + + try { + startGrid(); + + assertTrue(false); // Should never get here. + } + catch (IgniteCheckedException expected) { + info("Got expected exception: " + expected); + + assertEquals(IgniteState.STOPPED, G.state(getTestGridName())); + } + finally { + stopAllGrids(); + } + + assertEquals(1, bean.count(BEFORE_GRID_START)); + assertEquals(0, bean.count(AFTER_GRID_START)); + assertEquals(1, bean.count(BEFORE_GRID_STOP)); + assertEquals(1, bean.count(AFTER_GRID_STOP)); + } + + /** + * @throws Exception If failed. + */ + public void testGridErrorBeforeStop() throws Exception { + checkOnStop(BEFORE_GRID_STOP, true); + + assertEquals(1, bean.count(BEFORE_GRID_START)); + assertEquals(1, bean.count(AFTER_GRID_START)); + assertEquals(0, bean.count(BEFORE_GRID_STOP)); + assertEquals(1, bean.count(AFTER_GRID_STOP)); + } + + /** + * @throws Exception If failed. + */ + public void testOtherErrorBeforeStop() throws Exception { + checkOnStop(BEFORE_GRID_STOP, false); + + assertEquals(1, bean.count(BEFORE_GRID_START)); + assertEquals(1, bean.count(AFTER_GRID_START)); + assertEquals(0, bean.count(BEFORE_GRID_STOP)); + assertEquals(1, bean.count(AFTER_GRID_STOP)); + } + + /** + * @throws Exception If failed. + */ + public void testGridErrorAfterStop() throws Exception { + checkOnStop(AFTER_GRID_STOP, true); + + assertEquals(1, bean.count(BEFORE_GRID_START)); + assertEquals(1, bean.count(AFTER_GRID_START)); + assertEquals(1, bean.count(BEFORE_GRID_STOP)); + assertEquals(0, bean.count(AFTER_GRID_STOP)); + } + + /** + * @throws Exception If failed. + */ + public void testOtherErrorAfterStop() throws Exception { + checkOnStop(AFTER_GRID_STOP, false); + + assertEquals(1, bean.count(BEFORE_GRID_START)); + assertEquals(1, bean.count(AFTER_GRID_START)); + assertEquals(1, bean.count(BEFORE_GRID_STOP)); + assertEquals(0, bean.count(AFTER_GRID_STOP)); + } + + /** + * @param evt Error event. + * @param gridErr Grid error flag. + * @throws Exception If failed. + */ + private void checkOnStop(LifecycleEventType evt, boolean gridErr) throws Exception { + bean = new LifeCycleExceptionBean(evt, gridErr); + + try { + startGrid(); + + assertEquals(IgniteState.STARTED, G.state(getTestGridName())); + } + catch (IgniteCheckedException ignore) { + assertTrue(false); + } + finally { + try { + stopAllGrids(); + + assertEquals(IgniteState.STOPPED, G.state(getTestGridName())); + } + catch (Exception ignore) { + assertTrue(false); + } + } + } + + /** + * + */ + private static class LifeCycleBaseBean implements LifecycleBean { + /** */ + private Map<LifecycleEventType, AtomicInteger> callsCntr = + new EnumMap<>(LifecycleEventType.class); + + /** + * + */ + private LifeCycleBaseBean() { + for (LifecycleEventType t : LifecycleEventType.values()) + callsCntr.put(t, new AtomicInteger()); + } + + /** {@inheritDoc} */ + @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException { + callsCntr.get(evt).incrementAndGet(); + } + + /** + * @param t Event type. + * @return Number of calls. + */ + public int count(LifecycleEventType t) { + return callsCntr.get(t).get(); + } + } + + /** + * + */ + private static class LifeCycleExceptionBean extends LifeCycleBaseBean { + /** */ + private LifecycleEventType errType; + + private boolean gridErr; + + /** + * @param errType type of event to throw error. + * @param gridErr {@code True} if {@link IgniteCheckedException}. + */ + private LifeCycleExceptionBean(LifecycleEventType errType, boolean gridErr) { + this.errType = errType; + this.gridErr = gridErr; + } + + /** {@inheritDoc} */ + @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException { + if (evt == errType) { + if (gridErr) + throw new IgniteCheckedException("Expected exception for event: " + evt) { + @Override public void printStackTrace(PrintStream s) { + // No-op. + } + + @Override public void printStackTrace(PrintWriter s) { + // No-op. + } + }; + else + throw new RuntimeException("Expected exception for event: " + evt) { + @Override public void printStackTrace(PrintStream s) { + // No-op. + } + + @Override public void printStackTrace(PrintWriter s) { + // No-op. + } + }; + } + + super.onLifecycleEvent(evt); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridListenActorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridListenActorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridListenActorSelfTest.java new file mode 100644 index 0000000..96926f5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridListenActorSelfTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.messaging.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Tests for {@link org.apache.ignite.messaging.MessagingListenActor}. + */ +public class GridListenActorSelfTest extends GridCommonAbstractTest { + /** */ + private static final int MSG_QTY = 10; + + /** */ + private static final int PING_PONG_STEPS = 10; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override protected void afterTest() throws Exception { + ((GridKernal)grid()).context().io(). + removeMessageListener(GridTopic.TOPIC_COMM_USER.name()); + } + + /** + * + * @throws Exception Thrown if failed. + */ + public void testBasicFlow() throws Exception { + final AtomicInteger cnt = new AtomicInteger(0); + + grid().message().localListen(null, new MessagingListenActor<String>() { + @Override + public void receive(UUID uuid, String rcvMsg) { + if ("TEST".equals(rcvMsg)) { + cnt.incrementAndGet(); + + // "Exit" after 1st message. + // Should never receive any more messages. + stop(); + } else { + assert false : "Unknown message: " + rcvMsg; + + stop(); + } + } + }); + + grid().message().send(null, "TEST"); // This message we should receive. + + // Flood it. + for (int i = 0; i < 100; i++) + grid().message().send(null, "TEST"); // This message should be lost... + + Thread.sleep(2000); + + assert cnt.get() == 1 : "Count is " + cnt.get(); + } + + /** + * @throws Exception If failed. + */ + public void testImmediateStop() throws Exception { + doSendReceive(MSG_QTY, 1); + } + + /** + * @throws Exception If failed. + */ + public void testReceiveAll() throws Exception { + doSendReceive(MSG_QTY, MSG_QTY); + } + + /** + * Testing {@link org.apache.ignite.messaging.MessagingListenActor#respond(UUID, Object)} method. + * + * @throws Exception If failed. + */ + public void testRespondToRemote() throws Exception { + startGrid(1); + + try { + final ClusterNode rmt = grid(1).localNode(); + + grid().message().localListen(null, new MessagingListenActor<String>() { + @Override protected void receive(UUID nodeId, String rcvMsg) throws IgniteCheckedException { + System.out.println("Local node received message: '" + rcvMsg + "'"); + + respond(rmt.id(), "RESPONSE"); + } + }); + + final AtomicInteger cnt = new AtomicInteger(); + + // Response listener + grid(1).message().localListen(null, new MessagingListenActor<String>() { + @Override public void receive(UUID nodeId, String rcvMsg) { + if ("RESPONSE".equals(rcvMsg)) { + System.out.println("Remote node received message: '" + rcvMsg + "'"); + + cnt.incrementAndGet(); + } + } + }); + + grid().message().send(null, "REQUEST"); + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return cnt.intValue() == 1; + } + }, getTestTimeout()); + } + finally { + stopGrid(1); + } + } + + /** + * @throws Exception If failed. + */ + public void testPingPong() throws Exception { + final AtomicInteger pingCnt = new AtomicInteger(); + final AtomicInteger pongCnt = new AtomicInteger(); + + final CountDownLatch latch = new CountDownLatch(PING_PONG_STEPS); + + grid().message().localListen(null, new MessagingListenActor<String>() { + @Override + protected void receive(UUID nodeId, String rcvMsg) throws IgniteCheckedException { + System.out.println("Received message: '" + rcvMsg + "'"); + + if ("PING".equals(rcvMsg)) { + pingCnt.incrementAndGet(); + + respond("PONG"); + } else if ("PONG".equals(rcvMsg)) { + pongCnt.incrementAndGet(); + + latch.countDown(); + + if (latch.getCount() > 0) + respond("PING"); + else + stop(); + } + } + }); + + grid().message().send(null, "PING"); + + latch.await(); + + assert pingCnt.intValue() == PING_PONG_STEPS; + assert pongCnt.intValue() == PING_PONG_STEPS; + } + + /** + * @param snd Sent messages quantity. + * @param rcv Max quantity of received messages before listener is removed. + * @throws Exception IF failed. + */ + private void doSendReceive(int snd, final int rcv) throws Exception { + assert rcv > 0; + assert snd >= 0; + + final AtomicInteger cnt = new AtomicInteger(0); + + grid().message().localListen(null, new MessagingListenActor<String>() { + @Override + protected void receive(UUID nodeId, String rcvMsg) { + System.out.println(Thread.currentThread().getName() + "# Received message: '" + rcvMsg + "'"); + + cnt.incrementAndGet(); + + if (cnt.intValue() == rcv) { + System.out.println(Thread.currentThread().getName() + "Calling stop..."); + + stop(); + } else if (cnt.intValue() < rcv) + skip(); + else + assert false; + } + }); + + for (int i = 1; i <= snd; i++) { + String msg = "MESSAGE " + i; + + grid().message().send(null, msg); + + System.out.println(Thread.currentThread().getName() + "# Sent message: '" + msg + "'"); + } + + Thread.sleep(2000); + + assert cnt.intValue() == rcv; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridLocalEventListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridLocalEventListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridLocalEventListenerSelfTest.java new file mode 100644 index 0000000..18b6b20 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridLocalEventListenerSelfTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Test ensuring that event listeners are picked by started node. + */ +public class GridLocalEventListenerSelfTest extends GridCommonAbstractTest { + /** Whether event fired. */ + private final CountDownLatch fired = new CountDownLatch(1); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + int idx = getTestGridIndex(gridName); + + if (idx == 0) { + Map<IgnitePredicate<? extends IgniteEvent>, int[]> lsnrs = new HashMap<>(); + + lsnrs.put(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + fired.countDown(); + + return true; + } + }, new int[] { IgniteEventType.EVT_NODE_JOINED } ); + + cfg.setLocalEventListeners(lsnrs); + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** + * Test listeners notification. + * + * @throws Exception If failed. + */ + public void testListener() throws Exception { + startGrids(2); + + assert fired.await(5000, TimeUnit.MILLISECONDS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridManagementJobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridManagementJobSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridManagementJobSelfTest.java new file mode 100644 index 0000000..35a4647 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridManagementJobSelfTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.processors.task.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Test whether internal and visor tasks are routed to management pool. + */ +@GridCommonTest(group = "Kernal Self") +public class GridManagementJobSelfTest extends GridCommonAbstractTest { + /** Amount of nodes in the grid. */ + private static final int GRID_CNT = 3; + + /** Management pool threads name prefix. */ + private static final String MGMT_THREAD_PREFIX = "mgmt_thread_"; + + /** Name of a regular task. */ + private static final String TASK_NAME = "task"; + + /** IP finder. */ + private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * Do not start grid initially. + */ + public GridManagementJobSelfTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + ExecutorService mgmtExecutor = Executors.newFixedThreadPool(10, new ThreadFactory() { + /** Counter for unique thread names. */ + private AtomicLong ctr = new AtomicLong(); + + /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") + @Override public Thread newThread(Runnable r) { + Thread t = new Thread(r); + + t.setName(MGMT_THREAD_PREFIX + ctr.getAndIncrement()); + + return t; + } + }); + + cfg.setManagementExecutorService(mgmtExecutor); + + cfg.setManagementExecutorServiceShutdown(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * Ensure that regular tasks are executed within public pool while Visor and internal + * taskss are executed in management pool on remote nodes. + * + * @throws Exception If failed. + */ + public void testNamedTasks() throws Exception { + runJob(TASK_NAME, new TestJob()); + } + + /** + * Ensure that jobs annotated with {@link GridInternal} are always executed in + * management pool irrespective of task name. + * + * @throws Exception If failed. + */ + public void testAnnotatedTasks() throws Exception { + runJob(TASK_NAME, new TestJobInternal()); + } + + /** + * Execute the TestJob on remote nodes. + * + * @param taskName Name of the task in which context this job will be executed. + * @param job Job. + * @throws Exception If failed. + */ + private void runJob(String taskName, Callable<Object> job) throws Exception { + // We run a task on remote nodes because on local node jobs will be executed in system pool anyway. + compute(grid(0).forRemotes()).withName(taskName).call(job); + } + + /** + * Test job which ensures that its executor thread is from management pool in case + * task name corresponds to either internal or Visor task. + */ + private static class TestJob implements Callable<Object>, Serializable { + /** Task session. */ + @IgniteTaskSessionResource + protected ComputeTaskSession ses; + + /** {@inheritDoc} */ + @Nullable @Override public Object call() throws IgniteCheckedException { + String threadName = Thread.currentThread().getName(); + + assertFalse(threadName.startsWith(MGMT_THREAD_PREFIX)); + + return null; + } + } + + /** + * Test job which ensures that it is always executed in management pool irrespectively + * of task name due to presence of {@link GridInternal} annotation. + */ + @GridInternal + private static class TestJobInternal implements Callable<Object>, Serializable { + /** {@inheritDoc} */ + @Nullable @Override public Object call() throws IgniteCheckedException { + String threadName = Thread.currentThread().getName(); + + assertTrue(threadName.startsWith(MGMT_THREAD_PREFIX)); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleJobsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleJobsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleJobsSelfTest.java new file mode 100644 index 0000000..beebb65 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleJobsSelfTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Tests multiple parallel jobs execution. + */ +@GridCommonTest(group = "Kernal Self") +public class GridMultipleJobsSelfTest extends GridCommonAbstractTest { + /** */ + private static final int LOG_MOD = 100; + + /** */ + private static final int TEST_TIMEOUT = 60 * 1000; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(1); + startGrid(2); + + assertEquals(2, grid(1).nodes().size()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(1); + stopGrid(2); + + assertEquals(0, G.allGrids().size()); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + if (getTestGridName(1).equals(gridName)) + c.setCacheConfiguration(/* no configured caches */); + else { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setBackups(1); + + c.setCacheConfiguration(cc); + } + + return c; + } + + /** + * @throws Exception If test failed. + */ + public void testNotAffinityJobs() throws Exception { + /* =========== Test properties =========== */ + int jobsNum = 5000; + int threadNum = 10; + + runTest(jobsNum, threadNum, NotAffinityJob.class); + } + + /** + * @throws Exception If test failed. + */ + public void testAffinityJobs() throws Exception { + /* =========== Test properties =========== */ + int jobsNum = 5000; + int threadNum = 10; + + runTest(jobsNum, threadNum, AffinityJob.class); + } + + /** + * @param jobsNum Number of jobs. + * @param threadNum Number of threads. + * @param jobCls Job class. + * @throws Exception If failed. + */ + private void runTest(final int jobsNum, int threadNum, final Class<? extends IgniteCallable<Boolean>> jobCls) + throws Exception { + final Ignite ignite1 = grid(1); + + final CountDownLatch latch = new CountDownLatch(jobsNum); + + final AtomicInteger jobsCnt = new AtomicInteger(); + + final AtomicInteger resCnt = new AtomicInteger(); + + GridTestUtils.runMultiThreaded(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + while (true) { + int cnt = jobsCnt.incrementAndGet(); + + if (cnt > jobsNum) + break; + + IgniteCallable<Boolean> job; + + try { + job = jobCls.newInstance(); + } + catch (Exception e) { + throw new IgniteCheckedException("Could not instantiate a job.", e); + } + + IgniteCompute comp = ignite1.compute().enableAsync(); + + comp.call(job); + + IgniteFuture<Boolean> fut = comp.future(); + + if (cnt % LOG_MOD == 0) + X.println("Submitted jobs: " + cnt); + + fut.listenAsync(new CIX1<IgniteFuture<Boolean>>() { + @Override public void applyx(IgniteFuture<Boolean> f) throws IgniteCheckedException { + try { + assert f.get(); + } + finally { + latch.countDown(); + + long cnt = resCnt.incrementAndGet(); + + if (cnt % LOG_MOD == 0) + X.println("Results count: " + cnt); + } + } + }); + } + } + }, threadNum, "TEST-THREAD"); + + latch.await(); + } + + /** + * Test not affinity job. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static class NotAffinityJob implements IgniteCallable<Boolean> { + /** */ + private static AtomicInteger cnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + int c = cnt.incrementAndGet(); + + if (c % LOG_MOD == 0) + X.println("Executed jobs: " + c); + + Thread.sleep(10); + + return true; + } + } + + /** + * Test affinity routed job. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static class AffinityJob implements IgniteCallable<Boolean> { + /** */ + private static AtomicInteger cnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + int c = cnt.incrementAndGet(); + + if (c % LOG_MOD == 0) + X.println("Executed affinity jobs: " + c); + + Thread.sleep(10); + + return true; + } + + /** + * @return Affinity key. + */ + @GridCacheAffinityKeyMapped + public String affinityKey() { + return "key"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleSpisSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleSpisSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleSpisSelfTest.java new file mode 100644 index 0000000..3f4eb60 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleSpisSelfTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.checkpoint.sharedfs.*; +import org.apache.ignite.spi.failover.*; +import org.apache.ignite.spi.failover.always.*; +import org.apache.ignite.spi.loadbalancing.roundrobin.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * Multiple SPIs test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridMultipleSpisSelfTest extends GridCommonAbstractTest { + /** */ + private boolean isTaskFailoverCalled; + + /** */ + private boolean isWrongTaskFailoverCalled; + + /** */ + private boolean isTaskLoadBalancingCalled; + + /** */ + private boolean isWrongTaskLoadBalancingCalled; + + /** */ + private boolean isTaskCheckPntCalled; + + /** */ + private boolean isWrongTaskCheckPntCalled; + + /** */ + private boolean isJobCheckPntCalled; + + /** */ + private boolean isWrongJobCheckPntCalled; + + /** */ + public GridMultipleSpisSelfTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + GridTestFailoverSpi fail1 = new GridTestFailoverSpi("fail2"); + GridTestFailoverSpi fail2 = new GridTestFailoverSpi("fail2"); + + fail1.setName("fail1"); + fail2.setName("fail2"); + + GridTestLoadBalancingSpi load1 = new GridTestLoadBalancingSpi("load2"); + GridTestLoadBalancingSpi load2 = new GridTestLoadBalancingSpi("load2"); + + load1.setName("load1"); + load2.setName("load2"); + + GridTestCheckpointSpi cp1 = new GridTestCheckpointSpi("cp2"); + GridTestCheckpointSpi cp2 = new GridTestCheckpointSpi("cp2"); + + cp1.setName("cp1"); + cp2.setName("cp2"); + + cfg.setFailoverSpi(fail1, fail2); + cfg.setLoadBalancingSpi(load1, load2); + cfg.setCheckpointSpi(cp1, cp2); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"UnusedCatchParameter"}) + public void testFailoverTask() throws Exception { + // Start local and remote grids. + Ignite ignite1 = startGrid(1); + startGrid(2); + + try { + // Say grid1 is a local one. Deploy task and execute it. + ignite1.compute().localDeployTask(GridTestMultipleSpisTask.class, + GridTestMultipleSpisTask.class.getClassLoader()); + + try { + ignite1.compute().execute(GridTestMultipleSpisTask.class.getName(), ignite1.cluster().localNode().id()); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + + assert false : "Unexpected exception."; + } + } + finally { + stopGrid(2); + stopGrid(1); + } + + assert isTaskFailoverCalled : "Expected Failover SPI has not been called."; + assert isTaskLoadBalancingCalled : "Expected Load balancing SPI has not been called."; + assert isTaskCheckPntCalled : "Expected Checkpoint SPI has not been called on task side."; + assert isJobCheckPntCalled : "Expected Checkpoint SPI has not been called on job side."; + + // All of them should remain false. + assert !isWrongTaskFailoverCalled : "Unexpected Failover SPI has been called."; + assert !isWrongTaskLoadBalancingCalled : "Unexpected Load balancing SPI has been called."; + assert !isWrongTaskCheckPntCalled : "Unexpected Checkpoint SPI has been called on task side."; + assert !isWrongJobCheckPntCalled : "Unexpected Checkpoint SPI has been called on job side."; + } + + /** */ + private class GridTestFailoverSpi extends AlwaysFailoverSpi { + /** */ + private String expName; + + /** + * Creates new failover SPI. + * + * @param expName Name of the SPI expected to be called. + */ + GridTestFailoverSpi(String expName) { + this.expName = expName; + } + + /** {@inheritDoc} */ + @Override public ClusterNode failover(FailoverContext ctx, List<ClusterNode> grid) { + if (getName().equals(expName)) + isTaskFailoverCalled = true; + else + isWrongTaskFailoverCalled = true; + + return super.failover(ctx, grid); + } + } + + /** */ + private class GridTestLoadBalancingSpi extends RoundRobinLoadBalancingSpi { + /** */ + private String expName; + + /** + * Creates new load balancing SPI. + * + * @param expName Name of the SPI expected to be called. + */ + GridTestLoadBalancingSpi(String expName) { + this.expName = expName; + } + + /** {@inheritDoc} */ + @Override public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, + ComputeJob job) throws IgniteCheckedException { + if (getName().equals(expName)) + isTaskLoadBalancingCalled = true; + else + isWrongTaskLoadBalancingCalled = true; + + return super.getBalancedNode(ses, top, job); + } + } + + /** */ + private class GridTestCheckpointSpi extends SharedFsCheckpointSpi { + /** */ + private String expName; + + /** + * Creates new checkpoint SPI. + * + * @param expName Name of the SPI expected to be called. + */ + GridTestCheckpointSpi(String expName) { + this.expName = expName; + } + + /** {@inheritDoc} */ + @Override public boolean saveCheckpoint(String key, byte[] state, long timeout, + boolean overwrite) throws IgniteSpiException { + if (getName().equals(expName)) + isTaskCheckPntCalled = true; + else + isWrongTaskCheckPntCalled = true; + + return super.saveCheckpoint(key, state, timeout, overwrite); + } + + /** {@inheritDoc} */ + @Override public byte[] loadCheckpoint(String key) throws IgniteSpiException { + if (getName().equals(expName)) + isJobCheckPntCalled = true; + else + isWrongJobCheckPntCalled = true; + + return super.loadCheckpoint(key); + } + } + + /** + * Task which splits to the jobs that uses SPIs from annotation. + */ + @SuppressWarnings({"PublicInnerClass"}) + @ComputeTaskSpis(loadBalancingSpi = "load2", failoverSpi = "fail2", checkpointSpi = "cp2") + @ComputeTaskSessionFullSupport + public static final class GridTestMultipleSpisTask extends ComputeTaskAdapter<UUID, Integer> { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession taskSes; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, UUID arg) throws IgniteCheckedException { + assert subgrid.size() == 2; + assert taskSes != null; + assert ignite != null; + assert ignite.cluster().localNode().id().equals(arg); + + taskSes.saveCheckpoint("test", arg); + + // Always map job to the local node where it will fail. + return Collections.singletonMap(new GridTestMultipleSpisJob(arg), ignite.cluster().localNode()); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, + List<ComputeJobResult> received) throws IgniteCheckedException { + if (res.getException() != null) + return ComputeJobResultPolicy.FAILOVER; + + return super.result(res, received); + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) { + return null; + } + } + + /** + * Job that always throws exception. + */ + private static class GridTestMultipleSpisJob extends ComputeJobAdapter { + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession jobSes; + + /** + * @param arg Job argument. + */ + GridTestMultipleSpisJob(UUID arg) { + super(arg); + } + + /** {@inheritDoc} */ + @Override public UUID execute() throws IgniteCheckedException { + assert ignite != null; + assert jobSes != null; + assert argument(0) != null; + + // Should always fail on task originating node and work on another one. + if (ignite.configuration().getNodeId().equals(argument(0))) + throw new IgniteCheckedException("Expected exception to failover job."); + + // Use checkpoint on job side. This will happen on remote node. + jobSes.loadCheckpoint("test"); + + return argument(0); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleVersionsDeploymentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleVersionsDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleVersionsDeploymentSelfTest.java new file mode 100644 index 0000000..9374083 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleVersionsDeploymentSelfTest.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * + */ +@GridCommonTest(group = "Kernal Self") +public class GridMultipleVersionsDeploymentSelfTest extends GridCommonAbstractTest { + /** Excluded classes. */ + private static final String[] EXCLUDE_CLASSES = new String[] { + GridDeploymentTestTask.class.getName(), + GridDeploymentTestJob.class.getName() + }; + + /** */ + public GridMultipleVersionsDeploymentSelfTest() { + super(/*start grid*/false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + // Override P2P configuration to exclude Task and Job classes + cfg.setPeerClassLoadingLocalClassPathExclude(GridDeploymentTestJob.class.getName(), + GridDeploymentTestTask.class.getName()); + + // Following tests makes sense in ISOLATED modes (they redeploy tasks + // and don't change task version. The different tasks with the same version from the same node + // executed in parallel - this does not work in share mode.) + cfg.setDeploymentMode(IgniteDeploymentMode.ISOLATED); + + cfg.setPeerClassLoadingLocalClassPathExclude( + "org.apache.ignite.internal.GridMultipleVersionsDeploymentSelfTest*"); + + return cfg; + } + + /** + * @param ignite Grid. + * @param taskName Task name. + * @return {@code true} if task has been deployed on passed grid. + */ + private boolean checkDeployed(Ignite ignite, String taskName) { + Map<String, Class<? extends ComputeTask<?, ?>>> locTasks = ignite.compute().localTasks(); + + if (log().isInfoEnabled()) + log().info("Local tasks found: " + locTasks); + + return locTasks.get(taskName) != null; + } + + /** + * @throws Exception If test failed. + */ + @SuppressWarnings("unchecked") + public void testMultipleVersionsLocalDeploy() throws Exception { + try { + Ignite ignite = startGrid(1); + + ClassLoader ldr1 = new GridTestClassLoader( + Collections.singletonMap("testResource", "1"), + getClass().getClassLoader(), + EXCLUDE_CLASSES); + + ClassLoader ldr2 = new GridTestClassLoader( + Collections.singletonMap("testResource", "2"), + getClass().getClassLoader(), + EXCLUDE_CLASSES + ); + + Class<? extends ComputeTask<?, ?>> taskCls1 = (Class<? extends ComputeTask<?, ?>>)ldr1. + loadClass(GridDeploymentTestTask.class.getName()); + + Class<? extends ComputeTask<?, ?>> taskCls2 = (Class<? extends ComputeTask<?, ?>>)ldr2. + loadClass(GridDeploymentTestTask.class.getName()); + + ignite.compute().localDeployTask(taskCls1, ldr1); + + // Task will wait for the signal. + ComputeTaskFuture fut = executeAsync(ignite.compute(), "GridDeploymentTestTask", null); + + // We should wait here when to be sure that job has been started. + // Since we loader task/job classes with different class loaders we cannot + // use any kind of mutex because of the illegal state exception. + // We have to use timer here. DO NOT CHANGE 2 seconds. This should be enough + // on Bamboo. + Thread.sleep(2000); + + assert checkDeployed(ignite, "GridDeploymentTestTask"); + + // Deploy new one - this should move first task to the obsolete list. + ignite.compute().localDeployTask(taskCls2, ldr2); + + boolean deployed = checkDeployed(ignite, "GridDeploymentTestTask"); + + Object res = fut.get(); + + ignite.compute().undeployTask("GridDeploymentTestTask"); + + // New one should be deployed. + assert deployed; + + // Wait for the execution. + assert res.equals(1); + } + finally { + stopGrid(1); + } + } + + /** + * @throws Exception If test failed. + */ + @SuppressWarnings("unchecked") + public void testMultipleVersionsP2PDeploy() throws Exception { + try { + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + final CountDownLatch latch = new CountDownLatch(2); + + g2.events().localListen( + new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + info("Received event: " + evt); + + latch.countDown(); + + return true; + } + }, EVT_TASK_UNDEPLOYED + ); + + ClassLoader ldr1 = new GridTestClassLoader( + Collections.singletonMap("testResource", "1"), + getClass().getClassLoader(), + EXCLUDE_CLASSES); + + ClassLoader ldr2 = new GridTestClassLoader( + Collections.singletonMap("testResource", "2"), + getClass().getClassLoader(), + EXCLUDE_CLASSES); + + Class<? extends ComputeTask<?, ?>> taskCls1 = (Class<? extends ComputeTask<?, ?>>)ldr1. + loadClass(GridDeploymentTestTask.class.getName()); + + Class<? extends ComputeTask<?, ?>> taskCls2 = (Class<? extends ComputeTask<?, ?>>)ldr2. + loadClass(GridDeploymentTestTask.class.getName()); + + g1.compute().localDeployTask(taskCls1, ldr1); + + // Task will wait for the signal. + ComputeTaskFuture fut1 = executeAsync(g1.compute(), "GridDeploymentTestTask", null); + + assert checkDeployed(g1, "GridDeploymentTestTask"); + + // We should wait here when to be sure that job has been started. + // Since we loader task/job classes with different class loaders we cannot + // use any kind of mutex because of the illegal state exception. + // We have to use timer here. DO NOT CHANGE 2 seconds here. + Thread.sleep(2000); + + // Deploy new one - this should move first task to the obsolete list. + g1.compute().localDeployTask(taskCls2, ldr2); + + // Task will wait for the signal. + ComputeTaskFuture fut2 = executeAsync(g1.compute(), "GridDeploymentTestTask", null); + + boolean deployed = checkDeployed(g1, "GridDeploymentTestTask"); + + Object res1 = fut1.get(); + Object res2 = fut2.get(); + + g1.compute().undeployTask("GridDeploymentTestTask"); + + // New one should be deployed. + assert deployed; + + // Wait for the execution. + assert res1.equals(1); + assert res2.equals(2); + + stopGrid(1); + + assert latch.await(3000, MILLISECONDS); + + assert !checkDeployed(g2, "GridDeploymentTestTask"); + } + finally { + stopGrid(2); + stopGrid(1); + } + } + + /** + * Task that maps {@link GridDeploymentTestJob} either on local node + * or on remote nodes if there are any. Never on both. + */ + @SuppressWarnings({"PublicInnerClass"}) + @ComputeTaskName(value="GridDeploymentTestTask") + public static class GridDeploymentTestTask extends ComputeTaskAdapter<Object, Object> { + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) throws IgniteCheckedException { + Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size()); + + boolean ignoreLocNode = false; + + assert ignite != null; + + UUID locNodeId = ignite.configuration().getNodeId(); + + assert locNodeId != null; + + if (subgrid.size() == 1) + assert subgrid.get(0).id().equals(locNodeId) : "Wrong node id."; + else + ignoreLocNode = true; + + for (ClusterNode node : subgrid) { + // Ignore local node. + if (ignoreLocNode && node.id().equals(locNodeId)) + continue; + + map.put(new GridDeploymentTestJob(), node); + } + + return map; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return results.get(0).getData(); + } + } + + /** + * Simple job class that requests resource with name "testResource" + * and expects "0" value. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static class GridDeploymentTestJob extends ComputeJobAdapter { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public Integer execute() throws IgniteCheckedException { + try { + if (log.isInfoEnabled()) + log.info("GridDeploymentTestJob job started"); + + // Again there is no way to get access to any + // mutex of the test class because of the different class loaders. + // we have to wait. + Thread.sleep(3000); + + // Here we should request some resources. New task + // has already been deployed and old one should be still available. + int res = getClass().getClassLoader().getResourceAsStream("testResource").read(); + + return res - 48; + } + catch (IOException | InterruptedException e) { + throw new IgniteCheckedException("Failed to execute job.", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java new file mode 100644 index 0000000..b4e86d8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.collision.jobstealing.*; +import org.apache.ignite.spi.failover.jobstealing.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Multithreaded job stealing test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest { + /** */ + private Ignite ignite; + + /** */ + public GridMultithreadedJobStealingSelfTest() { + super(false /* don't start grid*/); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ignite = startGridsMultiThreaded(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + ignite = null; + + stopAllGrids(); + } + + /** + * Test 2 jobs on 2 nodes. + * + * @throws Exception If test failed. + */ + public void testTwoJobsMultithreaded() throws Exception { + final AtomicReference<Exception> fail = new AtomicReference<>(null); + + final AtomicInteger stolen = new AtomicInteger(0); + final AtomicInteger noneStolen = new AtomicInteger(0); + + int threadsNum = 10; + + GridTestUtils.runMultiThreaded(new Runnable() { + /** */ + @Override public void run() { + try { + JobStealingResult res = ignite.compute().execute(JobStealingTask.class, null); + + info("Task result: " + res); + + switch(res) { + case NONE_STOLEN : { + noneStolen.addAndGet(2); + break; + } + case ONE_STOLEN : { + noneStolen.addAndGet(1); + stolen.addAndGet(1); + break; + } + case BOTH_STOLEN: { + stolen.addAndGet(2); + break; + } + default: { + assert false : "Result is: " + res; + } + } + } + catch (IgniteCheckedException e) { + log.error("Failed to execute task.", e); + + fail.getAndSet(e); + } + } + }, threadsNum, "JobStealingThread"); + + for (Ignite g : G.allGrids()) + info("Metrics [nodeId=" + g.cluster().localNode().id() + + ", metrics=" + g.cluster().localNode().metrics() + ']'); + + assert fail.get() == null : "Test failed with exception: " + fail.get(); + + // Total jobs number is threadsNum * 2 + assert stolen.get() + noneStolen.get() == threadsNum * 2 : "Incorrect processed jobs number"; + + assert stolen.get() != 0 : "No jobs were stolen."; + + // Under these circumstances we should not have more than 2 jobs + // difference. + assert Math.abs(stolen.get() - noneStolen.get()) <= 2 : "Stats [stolen=" + stolen + + ", noneStolen=" + noneStolen + ']'; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + JobStealingCollisionSpi colSpi = new JobStealingCollisionSpi(); + + // One job at a time. + colSpi.setActiveJobsThreshold(1); + colSpi.setWaitJobsThreshold(0); + + JobStealingFailoverSpi failSpi = new JobStealingFailoverSpi(); + + // Verify defaults. + assert failSpi.getMaximumFailoverAttempts() == JobStealingFailoverSpi.DFLT_MAX_FAILOVER_ATTEMPTS; + + cfg.setCollisionSpi(colSpi); + cfg.setFailoverSpi(failSpi); + + return cfg; + } + + /** + * Job stealing task. + */ + private static class JobStealingTask extends ComputeTaskAdapter<Object, JobStealingResult> { + /** Grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteCheckedException { + assert subgrid.size() == 2 : "Invalid subgrid size: " + subgrid.size(); + + Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size()); + + // Put all jobs onto local node. + for (int i = 0; i < subgrid.size(); i++) + map.put(new GridJobStealingJob(2000L), ignite.cluster().localNode()); + + return map; + } + + /** {@inheritDoc} */ + @Override public JobStealingResult reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == 2; + + for (ComputeJobResult res : results) { + log.info("Job result: " + res.getData()); + } + + Object obj0 = results.get(0).getData(); + + if (obj0.equals(results.get(1).getData())) { + if (obj0.equals(ignite.name())) + return JobStealingResult.NONE_STOLEN; + + return JobStealingResult.BOTH_STOLEN; + } + + return JobStealingResult.ONE_STOLEN; + } + } + + /** + * Job stealing job. + */ + private static final class GridJobStealingJob extends ComputeJobAdapter { + /** Injected grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * @param arg Job argument. + */ + GridJobStealingJob(Long arg) { + super(arg); + } + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + try { + Long sleep = argument(0); + + assert sleep != null; + + Thread.sleep(sleep); + } + catch (InterruptedException e) { + throw new IgniteCheckedException("Job got interrupted.", e); + } + + return ignite.name(); + } + } + + /** + * Job stealing result. + */ + private enum JobStealingResult { + /** */ + BOTH_STOLEN, + + /** */ + ONE_STOLEN, + + /** */ + NONE_STOLEN + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeFilterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeFilterSelfTest.java new file mode 100644 index 0000000..442754c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeFilterSelfTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * Node filter test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridNodeFilterSelfTest extends GridCommonAbstractTest { + /** Grid instance. */ + private Ignite ignite; + + /** Remote instance. */ + private Ignite rmtIgnite; + + /** */ + public GridNodeFilterSelfTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite = startGrid(1); + + rmtIgnite = startGrid(2); + startGrid(3); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(1); + stopGrid(2); + stopGrid(3); + + ignite = null; + rmtIgnite = null; + } + + /** + * @throws Exception If failed. + */ + public void testSynchronousExecute() throws Exception { + UUID nodeId = ignite.cluster().localNode().id(); + + UUID rmtNodeId = rmtIgnite.cluster().localNode().id(); + + Collection<ClusterNode> locNodes = ignite.cluster().forNodeId(nodeId).nodes(); + + assert locNodes.size() == 1; + assert locNodes.iterator().next().id().equals(nodeId); + + Collection<ClusterNode> rmtNodes = ignite.cluster().forNodeId(rmtNodeId).nodes(); + + assert rmtNodes.size() == 1; + assert rmtNodes.iterator().next().id().equals(rmtNodeId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeLocalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeLocalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeLocalSelfTest.java new file mode 100644 index 0000000..bf9b3b1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeLocalSelfTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * This test will test node local storage. + */ +@GridCommonTest(group = "Kernal Self") +public class GridNodeLocalSelfTest extends GridCommonAbstractTest { + /** Create test. */ + public GridNodeLocalSelfTest() { + super(/* Start grid. */true); + } + + /** + * Test node-local values operations. + * + * @throws Exception If test failed. + */ + public void testNodeLocal() throws Exception { + Ignite g = G.ignite(getTestGridName()); + + String keyStr = "key"; + int keyNum = 1; + Date keyDate = new Date(); + + GridTuple3 key = F.t(keyNum, keyStr, keyDate); + + ClusterNodeLocalMap<Object, Object> nl = g.cluster().nodeLocalMap(); + + nl.put(keyStr, "Hello world!"); + nl.put(key, 12); + + assert nl.containsKey(keyStr); + assert nl.containsKey(key); + assert !nl.containsKey(keyNum); + assert !nl.containsKey(F.t(keyNum, keyStr)); + + assert "Hello world!".equals(nl.get(keyStr)); + assert (Integer)nl.get(key) == 12; + } +}