http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridDiscoveryEventSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridDiscoveryEventSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridDiscoveryEventSelfTest.java deleted file mode 100644 index ef78a1b..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridDiscoveryEventSelfTest.java +++ /dev/null @@ -1,421 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * Tests discovery event topology snapshots. - */ -public class GridDiscoveryEventSelfTest extends GridCommonAbstractTest { - /** */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** Daemon flag. */ - private boolean daemon; - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - daemon = false; - } - - /** */ - private static final IgniteClosure<ClusterNode, UUID> NODE_2ID = new IgniteClosure<ClusterNode, UUID>() { - @Override public UUID apply(ClusterNode n) { - return n.id(); - } - - @Override public String toString() { - return "Grid node shadow to node ID transformer closure."; - } - }; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - c.setDaemon(daemon); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(ipFinder); - - c.setDiscoverySpi(disco); - - c.setRestEnabled(false); - - return c; - } - - /** - * @throws Exception If failed. - */ - public void testJoinSequenceEvents() throws Exception { - try { - Ignite g0 = startGrid(0); - - UUID id0 = g0.cluster().localNode().id(); - - final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>(); - - g0.events().localListen(new IgnitePredicate<IgniteEvent>() { - private AtomicInteger cnt = new AtomicInteger(); - - @Override public boolean apply(IgniteEvent evt) { - assert evt.type() == EVT_NODE_JOINED; - - evts.put(cnt.getAndIncrement(), ((IgniteDiscoveryEvent) evt).topologyNodes()); - - return true; - } - }, EVT_NODE_JOINED); - - UUID id1 = startGrid(1).cluster().localNode().id(); - UUID id2 = startGrid(2).cluster().localNode().id(); - UUID id3 = startGrid(3).cluster().localNode().id(); - - U.sleep(100); - - assertEquals("Wrong count of events received", 3, evts.size()); - - Collection<ClusterNode> top0 = evts.get(0); - - assertNotNull(top0); - assertEquals(2, top0.size()); - assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id0)); - assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id1)); - - Collection<ClusterNode> top1 = evts.get(1); - - assertNotNull(top1); - assertEquals(3, top1.size()); - assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id0)); - assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id1)); - assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id2)); - - Collection<ClusterNode> top2 = evts.get(2); - - assertNotNull(top2); - assertEquals(4, top2.size()); - assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id0)); - assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id1)); - assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id2)); - assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id3)); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testLeaveSequenceEvents() throws Exception { - try { - Ignite g0 = startGrid(0); - - UUID id0 = g0.cluster().localNode().id(); - UUID id1 = startGrid(1).cluster().localNode().id(); - UUID id2 = startGrid(2).cluster().localNode().id(); - UUID id3 = startGrid(3).cluster().localNode().id(); - - final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>(); - - g0.events().localListen(new IgnitePredicate<IgniteEvent>() { - private AtomicInteger cnt = new AtomicInteger(); - - @Override public boolean apply(IgniteEvent evt) { - assert evt.type() == EVT_NODE_LEFT; - - evts.put(cnt.getAndIncrement(), ((IgniteDiscoveryEvent) evt).topologyNodes()); - - return true; - } - }, EVT_NODE_LEFT); - - stopGrid(3); - stopGrid(2); - stopGrid(1); - - U.sleep(100); - - assertEquals("Wrong count of events received", 3, evts.size()); - - Collection<ClusterNode> top2 = evts.get(0); - - assertNotNull(top2); - assertEquals(3, top2.size()); - assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id0)); - assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id1)); - assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id2)); - assertFalse(F.viewReadOnly(top2, NODE_2ID).contains(id3)); - - Collection<ClusterNode> top1 = evts.get(1); - - assertNotNull(top1); - assertEquals(2, top1.size()); - assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id0)); - assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id1)); - assertFalse(F.viewReadOnly(top1, NODE_2ID).contains(id2)); - assertFalse(F.viewReadOnly(top1, NODE_2ID).contains(id3)); - - Collection<ClusterNode> top0 = evts.get(2); - - assertNotNull(top0); - assertEquals(1, top0.size()); - assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id0)); - assertFalse(F.viewReadOnly(top0, NODE_2ID).contains(id1)); - assertFalse(F.viewReadOnly(top0, NODE_2ID).contains(id2)); - assertFalse(F.viewReadOnly(top0, NODE_2ID).contains(id3)); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testMixedSequenceEvents() throws Exception { - try { - Ignite g0 = startGrid(0); - - UUID id0 = g0.cluster().localNode().id(); - - final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>(); - - g0.events().localListen(new IgnitePredicate<IgniteEvent>() { - private AtomicInteger cnt = new AtomicInteger(); - - @Override public boolean apply(IgniteEvent evt) { - assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT; - - evts.put(cnt.getAndIncrement(), ((IgniteDiscoveryEvent) evt).topologyNodes()); - - return true; - } - }, EVT_NODE_JOINED, EVT_NODE_LEFT); - - UUID id1 = startGrid(1).cluster().localNode().id(); - UUID id2 = startGrid(2).cluster().localNode().id(); - UUID id3 = startGrid(3).cluster().localNode().id(); - - stopGrid(3); - stopGrid(2); - stopGrid(1); - - UUID id4 = startGrid(4).cluster().localNode().id(); - - stopGrid(4); - - U.sleep(100); - - assertEquals("Wrong count of events received", 8, evts.size()); - - Collection<ClusterNode> top0 = evts.get(0); - - assertNotNull(top0); - assertEquals(2, top0.size()); - assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id0)); - assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id1)); - - Collection<ClusterNode> top1 = evts.get(1); - - assertNotNull(top1); - assertEquals(3, top1.size()); - assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id0)); - assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id1)); - assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id2)); - - Collection<ClusterNode> top2 = evts.get(2); - - assertNotNull(top2); - assertEquals(4, top2.size()); - assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id0)); - assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id1)); - assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id2)); - assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id3)); - - Collection<ClusterNode> top3 = evts.get(3); - - assertNotNull(top3); - assertEquals(3, top3.size()); - assertTrue(F.viewReadOnly(top3, NODE_2ID).contains(id0)); - assertTrue(F.viewReadOnly(top3, NODE_2ID).contains(id1)); - assertTrue(F.viewReadOnly(top3, NODE_2ID).contains(id2)); - assertFalse(F.viewReadOnly(top3, NODE_2ID).contains(id3)); - - Collection<ClusterNode> top4 = evts.get(4); - - assertNotNull(top4); - assertEquals(2, top4.size()); - assertTrue(F.viewReadOnly(top4, NODE_2ID).contains(id0)); - assertTrue(F.viewReadOnly(top4, NODE_2ID).contains(id1)); - assertFalse(F.viewReadOnly(top4, NODE_2ID).contains(id2)); - assertFalse(F.viewReadOnly(top4, NODE_2ID).contains(id3)); - - Collection<ClusterNode> top5 = evts.get(5); - - assertNotNull(top5); - assertEquals(1, top5.size()); - assertTrue(F.viewReadOnly(top5, NODE_2ID).contains(id0)); - assertFalse(F.viewReadOnly(top5, NODE_2ID).contains(id1)); - assertFalse(F.viewReadOnly(top5, NODE_2ID).contains(id2)); - assertFalse(F.viewReadOnly(top5, NODE_2ID).contains(id3)); - - Collection<ClusterNode> top6 = evts.get(6); - - assertNotNull(top6); - assertEquals(2, top6.size()); - assertTrue(F.viewReadOnly(top6, NODE_2ID).contains(id0)); - assertTrue(F.viewReadOnly(top6, NODE_2ID).contains(id4)); - assertFalse(F.viewReadOnly(top6, NODE_2ID).contains(id1)); - assertFalse(F.viewReadOnly(top6, NODE_2ID).contains(id2)); - assertFalse(F.viewReadOnly(top6, NODE_2ID).contains(id3)); - - Collection<ClusterNode> top7 = evts.get(7); - - assertNotNull(top7); - assertEquals(1, top7.size()); - assertTrue(F.viewReadOnly(top7, NODE_2ID).contains(id0)); - assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id1)); - assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id2)); - assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id3)); - assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id4)); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentJoinEvents() throws Exception { - try { - Ignite g0 = startGrid(0); - - UUID id0 = g0.cluster().localNode().id(); - - final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>(); - - g0.events().localListen(new IgnitePredicate<IgniteEvent>() { - private AtomicInteger cnt = new AtomicInteger(); - - @Override public boolean apply(IgniteEvent evt) { - assert evt.type() == EVT_NODE_JOINED; - - X.println(">>>>>>> Joined " + F.viewReadOnly(((IgniteDiscoveryEvent) evt).topologyNodes(), - NODE_2ID)); - - evts.put(cnt.getAndIncrement(), ((IgniteDiscoveryEvent) evt).topologyNodes()); - - return true; - } - }, EVT_NODE_JOINED); - - U.sleep(100); - - startGridsMultiThreaded(1, 10); - - U.sleep(100); - - assertEquals(10, evts.size()); - - for (int i = 0; i < 10; i++) { - Collection<ClusterNode> snapshot = evts.get(i); - - assertEquals(2 + i, snapshot.size()); - assertTrue(F.viewReadOnly(snapshot, NODE_2ID).contains(id0)); - - for (ClusterNode n : snapshot) - assertTrue("Wrong node order in snapshot [i=" + i + ", node=" + n + ']', n.order() <= 2 + i); - } - - Collection<UUID> ids = F.viewReadOnly(evts.get(9), NODE_2ID); - - for (int i = 1; i <= 10; i++) - assertTrue(ids.contains(grid(i).localNode().id())); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testDaemonNodeJoin() throws Exception { - try { - startGridsMultiThreaded(3); - - final AtomicReference<IgniteCheckedException> err = new AtomicReference<>(); - - for (int i = 0; i < 3; i++) { - Ignite g = grid(i); - - g.events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent) evt; - - if (discoEvt.topologyNodes().size() != 3) - err.compareAndSet(null, new IgniteCheckedException("Invalid discovery event [evt=" + discoEvt + - ", nodes=" + discoEvt.topologyNodes() + ']')); - - return true; - } - }, IgniteEventType.EVT_NODE_JOINED); - } - - daemon = true; - - GridKernal daemon = (GridKernal)startGrid(3); - - IgniteDiscoveryEvent join = daemon.context().discovery().localJoinEvent(); - - assertEquals(3, join.topologyNodes().size()); - - U.sleep(100); - - if (err.get() != null) - throw err.get(); - } - finally { - stopAllGrids(); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridDiscoverySelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridDiscoverySelfTest.java deleted file mode 100644 index abe6fa3..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridDiscoverySelfTest.java +++ /dev/null @@ -1,419 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.product.*; -import org.apache.ignite.internal.managers.discovery.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.product.IgniteProductVersion.*; -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * GridDiscovery self test. - */ -public class GridDiscoverySelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static Ignite ignite; - - /** Nodes count. */ - private static final int NODES_CNT = 5; - - /** Maximum timeout when remote nodes join/left the topology */ - private static final int MAX_TIMEOUT_IN_MINS = 5; - - /** */ - public GridDiscoverySelfTest() { - super(/*start grid*/true); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - //cacheCfg.setName(null); - cacheCfg.setCacheMode(GridCacheMode.PARTITIONED); - - cfg.setCacheConfiguration(cacheCfg); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - ignite = G.ignite(getTestGridName()); - } - - /** - * @throws Exception If failed. - */ - public void testGetRemoteNodes() throws Exception { - Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes(); - - printNodes(nodes); - } - - /** - * @throws Exception If failed. - */ - public void testGetAllNodes() throws Exception { - Collection<ClusterNode> nodes = ignite.cluster().nodes(); - - printNodes(nodes); - - assert nodes != null; - assert !nodes.isEmpty(); - } - - /** - * @throws Exception If failed. - */ - public void testGetTopologyHash() throws Exception { - int hashCnt = 5000; - - Random rand = new Random(); - - Collection<Long> hashes = new HashSet<>(hashCnt, 1.0f); - - for (int i = 0; i < hashCnt; i++) { - // Max topology of 10 nodes. - int size = rand.nextInt(10) + 1; - - Collection<ClusterNode> nodes = new ArrayList<>(size); - - for (int j = 0; j < size; j++) - nodes.add(new GridDiscoveryTestNode()); - - @SuppressWarnings("deprecation") - long hash = ((GridKernal) ignite).context().discovery().topologyHash(nodes); - - boolean isHashed = hashes.add(hash); - - assert isHashed : "Duplicate hash [hash=" + hash + ", topSize=" + size + ", iteration=" + i + ']'; - } - - info("No duplicates found among '" + hashCnt + "' hashes."); - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings({"SuspiciousMethodCalls"}) - public void testGetLocalNode() throws Exception { - ClusterNode node = ignite.cluster().localNode(); - - assert node != null; - - Collection<ClusterNode> nodes = ignite.cluster().nodes(); - - assert nodes != null; - assert nodes.contains(node); - } - - /** - * @throws Exception If failed. - */ - public void testPingNode() throws Exception { - ClusterNode node = ignite.cluster().localNode(); - - assert node != null; - - boolean pingRes = ignite.cluster().pingNode(node.id()); - - assert pingRes : "Failed to ping local node."; - } - - /** - * @throws Exception If failed. - */ - public void testDiscoveryListener() throws Exception { - ClusterNode node = ignite.cluster().localNode(); - - assert node != null; - - final AtomicInteger cnt = new AtomicInteger(); - - /** Joined nodes counter. */ - final CountDownLatch joinedCnt = new CountDownLatch(NODES_CNT); - - /** Left nodes counter. */ - final CountDownLatch leftCnt = new CountDownLatch(NODES_CNT); - - IgnitePredicate<IgniteEvent> lsnr = new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - if (EVT_NODE_JOINED == evt.type()) { - cnt.incrementAndGet(); - - joinedCnt.countDown(); - } - else if (EVT_NODE_LEFT == evt.type()) { - int i = cnt.decrementAndGet(); - - assert i >= 0; - - leftCnt.countDown(); - } - else - assert false; - - return true; - } - }; - - ignite.events().localListen(lsnr, EVT_NODE_LEFT, EVT_NODE_JOINED); - - try { - for (int i = 0; i < NODES_CNT; i++) - startGrid(i); - - joinedCnt.await(MAX_TIMEOUT_IN_MINS, MINUTES); - - assert cnt.get() == NODES_CNT; - - for (int i = 0; i < NODES_CNT; i++) - stopGrid(i); - - leftCnt.await(MAX_TIMEOUT_IN_MINS, MINUTES); - - assert cnt.get() == 0; - - ignite.events().stopLocalListen(lsnr); - - assert cnt.get() == 0; - } - finally { - for (int i = 0; i < NODES_CNT; i++) - stopAndCancelGrid(i); - } - } - - /** - * Test cache nodes resolved correctly from topology history. - * - * @throws Exception In case of any exception. - */ - public void testCacheNodes() throws Exception { - // Validate only original node is available. - GridDiscoveryManager discoMgr = ((GridKernal) ignite).context().discovery(); - - Collection<ClusterNode> nodes = discoMgr.allNodes(); - - assert nodes.size() == 1 : "Expects only original node is available: " + nodes; - - final long topVer0 = discoMgr.topologyVersion(); - - assert topVer0 > 0 : "Unexpected initial topology version: " + topVer0; - - List<UUID> uuids = new ArrayList<>(NODES_CNT); - - UUID locId = ignite.cluster().localNode().id(); - - try { - // Start nodes. - for (int i = 0; i < NODES_CNT; i++) - uuids.add(startGrid(i).cluster().localNode().id()); - - // Stop nodes. - for (int i = 0; i < NODES_CNT; i++) - stopGrid(i); - - final long topVer = discoMgr.topologyVersion(); - - assert topVer == topVer0 + NODES_CNT * 2 : "Unexpected topology version: " + topVer; - - for (long ver = topVer0; ver <= topVer; ver++) { - Collection<UUID> exp = new ArrayList<>(); - - exp.add(locId); - - for (int i = 0; i < NODES_CNT && i < ver - topVer0; i++) - exp.add(uuids.get(i)); - - for (int i = 0; i < ver - topVer0 - NODES_CNT; i++) - exp.remove(uuids.get(i)); - - // Cache nodes by topology version (e.g. NODE_CNT == 3). - // 0 1 2 3 (node id) - // 1 (topVer) + - only local node - // 2 + + - // 3 + + + - // 4 + + + + - // 5 + + + - // 6 + + - // 7 + - only local node - - Collection<ClusterNode> cacheNodes = discoMgr.cacheNodes(null, ver); - - Collection<UUID> act = new ArrayList<>(F.viewReadOnly(cacheNodes, new C1<ClusterNode, UUID>() { - @Override public UUID apply(ClusterNode n) { - return n.id(); - } - })); - - assertEquals("Expects correct cache nodes for topology version: " + ver, exp, act); - } - } - finally { - for (int i = 0; i < NODES_CNT; i++) - stopAndCancelGrid(i); - } - } - - /** - * @param nodes Nodes. - */ - private void printNodes(Collection<ClusterNode> nodes) { - StringBuilder buf = new StringBuilder(); - - if (nodes != null && !nodes.isEmpty()) { - buf.append("Found nodes [nodes={"); - - int i = 0; - - for (Iterator<ClusterNode> iter = nodes.iterator(); iter.hasNext(); i++) { - ClusterNode node = iter.next(); - - buf.append(node.id()); - - if (i + 1 != nodes.size()) - buf.append(", "); - } - - buf.append("}]"); - } - else - buf.append("Found no nodes."); - - if (log().isDebugEnabled()) - log().debug(buf.toString()); - } - - /** - * - */ - private static class GridDiscoveryTestNode extends GridMetadataAwareAdapter implements ClusterNode { - /** */ - private static AtomicInteger consistentIdCtr = new AtomicInteger(); - - /** */ - private UUID nodeId = UUID.randomUUID(); - - /** */ - private Object consistentId = consistentIdCtr.incrementAndGet(); - - /** {@inheritDoc} */ - @Override public long order() { - return -1; - } - - /** {@inheritDoc} */ - @Override public IgniteProductVersion version() { - return fromString("99.99.99"); - } - - /** {@inheritDoc} */ - @Override public UUID id() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public Object consistentId() { - return consistentId; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Nullable @Override public <T> T attribute(String name) { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public ClusterNodeMetrics metrics() { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public Map<String, Object> attributes() { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection<String> addresses() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean isLocal() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isDaemon() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isClient() { - return false; - } - - /** {@inheritDoc} */ - @Override public Collection<String> hostNames() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - return F.eqNodes(this, o); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return id().hashCode(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridEventStorageCheckAllEventsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridEventStorageCheckAllEventsSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridEventStorageCheckAllEventsSelfTest.java deleted file mode 100644 index 864f82a..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridEventStorageCheckAllEventsSelfTest.java +++ /dev/null @@ -1,433 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * Test event storage. - */ -@GridCommonTest(group = "Kernal Self") -public class GridEventStorageCheckAllEventsSelfTest extends GridCommonAbstractTest { - /** */ - private static Ignite ignite; - - /** - * - */ - public GridEventStorageCheckAllEventsSelfTest() { - super(/*start grid*/true); - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - ignite = G.ignite(getTestGridName()); - - long tstamp = startTimestamp(); - - ignite.compute().localDeployTask(GridAllEventsTestTask.class, GridAllEventsTestTask.class.getClassLoader()); - - List<IgniteEvent> evts = pullEvents(tstamp, 1); - - assertEvent(evts.get(0).type(), EVT_TASK_DEPLOYED, evts); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - ignite = null; - } - - /** - * @param evtType Actual event type. - * @param expType Expected event type. - * @param evts Full list of events. - */ - private void assertEvent(int evtType, int expType, List<IgniteEvent> evts) { - assert evtType == expType : "Invalid event [evtType=" + evtType + ", expectedType=" + expType + - ", evts=" + evts + ']'; - } - - /** - * @throws Exception If test failed. - */ - public void testCheckpointEvents() throws Exception { - long tstamp = startTimestamp(); - - generateEvents(null, new GridAllCheckpointEventsTestJob()).get(); - - List<IgniteEvent> evts = pullEvents(tstamp, 11); - - assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); - assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); - assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); - assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); - assertEvent(evts.get(4).type(), EVT_CHECKPOINT_SAVED, evts); - assertEvent(evts.get(5).type(), EVT_CHECKPOINT_LOADED, evts); - assertEvent(evts.get(6).type(), EVT_CHECKPOINT_REMOVED, evts); - assertEvent(evts.get(7).type(), EVT_JOB_RESULTED, evts); - assertEvent(evts.get(8).type(), EVT_TASK_REDUCED, evts); - assertEvent(evts.get(9).type(), EVT_TASK_FINISHED, evts); - assertEvent(evts.get(10).type(), EVT_JOB_FINISHED, evts); - } - - /** - * @throws Exception If test failed. - */ - public void testTaskUndeployEvents() throws Exception { - long tstamp = startTimestamp(); - - generateEvents(null, new GridAllEventsSuccessTestJob()).get(); - - ignite.compute().undeployTask(GridAllEventsTestTask.class.getName()); - ignite.compute().localDeployTask(GridAllEventsTestTask.class, GridAllEventsTestTask.class.getClassLoader()); - - List<IgniteEvent> evts = pullEvents(tstamp, 12); - - assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); - assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); - assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); - assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); - assertEvent(evts.get(4).type(), EVT_CHECKPOINT_SAVED, evts); - assertEvent(evts.get(5).type(), EVT_CHECKPOINT_REMOVED, evts); - assertEvent(evts.get(6).type(), EVT_JOB_RESULTED, evts); - assertEvent(evts.get(7).type(), EVT_TASK_REDUCED, evts); - assertEvent(evts.get(8).type(), EVT_TASK_FINISHED, evts); - assertEvent(evts.get(9).type(), EVT_JOB_FINISHED, evts); - assertEvent(evts.get(10).type(), EVT_TASK_UNDEPLOYED, evts); - assertEvent(evts.get(11).type(), EVT_TASK_DEPLOYED, evts); - } - - /** - * @throws Exception If test failed. - */ - public void testSuccessTask() throws Exception { - long tstamp = startTimestamp(); - - generateEvents(null, new GridAllEventsSuccessTestJob()).get(); - - List<IgniteEvent> evts = pullEvents(tstamp, 10); - - assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); - assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); - assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); - assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); - assertEvent(evts.get(4).type(), EVT_CHECKPOINT_SAVED, evts); - assertEvent(evts.get(5).type(), EVT_CHECKPOINT_REMOVED, evts); - assertEvent(evts.get(6).type(), EVT_JOB_RESULTED, evts); - assertEvent(evts.get(7).type(), EVT_TASK_REDUCED, evts); - assertEvent(evts.get(8).type(), EVT_TASK_FINISHED, evts); - assertEvent(evts.get(9).type(), EVT_JOB_FINISHED, evts); - } - - /** - * @throws Exception If test failed. - */ - public void testFailTask() throws Exception { - long tstamp = startTimestamp(); - - ComputeTaskFuture<?> fut = generateEvents(null, new GridAllEventsFailTestJob()); - - try { - fut.get(); - - assert false : "Grid with locally executed job with timeout should throw GridComputeTaskTimeoutException."; - } - catch (IgniteCheckedException e) { - info("Expected exception caught [taskFuture=" + fut + ", exception=" + e + ']'); - } - - List<IgniteEvent> evts = pullEvents(tstamp, 7); - - assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); - assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); - assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); - assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); - assertEvent(evts.get(4).type(), EVT_JOB_RESULTED, evts); - assertEvent(evts.get(5).type(), EVT_TASK_FAILED, evts); - assertEvent(evts.get(6).type(), EVT_JOB_FAILED, evts); - } - - /** - * @throws Exception If test failed. - */ - public void testTimeoutTask() throws Exception { - long tstamp = startTimestamp(); - - ComputeTaskFuture<?> fut = generateEvents(1000L, new GridAllEventsTimeoutTestJob()); - - try { - fut.get(); - - assert false : "Task should fail."; - } - catch (ComputeTaskTimeoutException e) { - info("Expected timeout exception caught [taskFuture=" + fut + ", exception=" + e + ']'); - } - - List<IgniteEvent> evts = pullEvents(tstamp, 6); - - assertEvent(evts.get(0).type(), EVT_TASK_STARTED, evts); - assertEvent(evts.get(1).type(), EVT_JOB_MAPPED, evts); - assertEvent(evts.get(2).type(), EVT_JOB_QUEUED, evts); - assertEvent(evts.get(3).type(), EVT_JOB_STARTED, evts); - - boolean isTaskTimeout = false; - boolean isTaskFailed = false; - - for (int i = 4; i < evts.size(); i++) { - int evtType = evts.get(i).type(); - - if (evtType == EVT_TASK_TIMEDOUT) { - assert !isTaskTimeout; - assert !isTaskFailed; - - isTaskTimeout = true; - } - else if (evtType == EVT_TASK_FAILED) { - assert isTaskTimeout; - assert !isTaskFailed; - - isTaskFailed = true; - } - else { - assert evtType == EVT_JOB_CANCELLED - || evtType == EVT_JOB_TIMEDOUT - || evtType == EVT_JOB_FAILED - || evtType == EVT_JOB_FINISHED : - "Unexpected event: " + evts.get(i); - } - } - - assert isTaskTimeout; - assert isTaskFailed; - } - - /** - * Returns timestamp at the method call moment, but sleeps before return, - * to allow pass {@link GridUtils#currentTimeMillis()}. - * - * @return Call timestamp. - * @throws InterruptedException If sleep was interrupted. - */ - private long startTimestamp() throws InterruptedException { - long tstamp = System.currentTimeMillis(); - - Thread.sleep(20); - - return tstamp; - } - - /** - * Pull all test task related events since the given moment. - * - * @param since Earliest time to pulled events. - * @param evtCnt Expected event count - * @return List of events. - * @throws Exception If failed. - */ - private List<IgniteEvent> pullEvents(long since, int evtCnt) throws Exception { - IgnitePredicate<IgniteEvent> filter = new CustomEventFilter(GridAllEventsTestTask.class.getName(), since); - - for (int i = 0; i < 3; i++) { - List<IgniteEvent> evts = new ArrayList<>(ignite.events().localQuery((filter))); - - info("Filtered events [size=" + evts.size() + ", evts=" + evts + ']'); - - if (evtCnt != evts.size() && i < 2) { - U.warn(log, "Invalid event count (will retry in 1000 ms) [actual=" + evts.size() + - ", expected=" + evtCnt + ", evts=" + evts + ']'); - - U.sleep(1000); - - continue; - } - - assert evtCnt <= evts.size() : "Invalid event count [actual=" + evts.size() + ", expected=" + evtCnt + - ", evts=" + evts + ']'; - - return evts; - } - - assert false; - - return null; - } - - /** - * @param timeout Timeout. - * @param job Job. - * @return Task future. - * @throws Exception If failed. - */ - private ComputeTaskFuture<?> generateEvents(@Nullable Long timeout, ComputeJob job) throws Exception { - IgniteCompute comp = ignite.compute().enableAsync(); - - if (timeout == null) - comp.execute(GridAllEventsTestTask.class.getName(), job); - else - comp.withTimeout(timeout).execute(GridAllEventsTestTask.class.getName(), job); - - return comp.future(); - } - - /** - * - */ - private static class CustomEventFilter implements IgnitePredicate<IgniteEvent> { - /** */ - private final String taskName; - - /** */ - private final long tstamp; - - /** - * @param taskName Task name. - * @param tstamp Timestamp. - */ - CustomEventFilter(String taskName, long tstamp) { - assert taskName != null; - assert tstamp > 0; - - this.taskName = taskName; - this.tstamp = tstamp; - } - - /** {@inheritDoc} */ - @Override public boolean apply(IgniteEvent evt) { - if (evt.timestamp() >= tstamp) { - if (evt instanceof IgniteTaskEvent) - return taskName.equals(((IgniteTaskEvent)evt).taskName()); - else if (evt instanceof IgniteJobEvent) - return taskName.equals(((IgniteJobEvent)evt).taskName()); - else if (evt instanceof IgniteDeploymentEvent) - return taskName.equals(((IgniteDeploymentEvent)evt).alias()); - else if (evt instanceof IgniteCheckpointEvent) - return true; - } - - return false; - } - } - - /** - * - */ - private static class GridAllEventsSuccessTestJob extends ComputeJobAdapter { - /** */ - @IgniteTaskSessionResource - private ComputeTaskSession taskSes; - - /** {@inheritDoc} */ - @Override public String execute() throws IgniteCheckedException { - assert taskSes != null; - - taskSes.saveCheckpoint("testCheckpoint", "TestState"); - taskSes.removeCheckpoint("testCheckpoint"); - - return "GridAllEventsSuccessTestJob-test-event-success."; - } - } - - /** - * - */ - private static class GridAllEventsFailTestJob extends ComputeJobAdapter { - /** {@inheritDoc} */ - @Override public String execute() { - throw new RuntimeException("GridAllEventsFailTestJob expected test exception."); - } - } - - /** - */ - private static class GridAllEventsTimeoutTestJob extends ComputeJobAdapter { - /** */ - @IgniteLoggerResource - private IgniteLogger log; - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override public String execute() { - try { - while (!isCancelled()) - Thread.sleep(5000); - } - catch (InterruptedException ignored) { - if (log.isInfoEnabled()) - log.info("GridAllEventsTimeoutTestJob was interrupted."); - - return "GridAllEventsTimeoutTestJob-test-event-timeout."; - } - - return "GridAllEventsTimeoutTestJob-test-event-timeout."; - } - } - - /** - * - */ - private static class GridAllCheckpointEventsTestJob extends ComputeJobAdapter { - /** */ - @IgniteTaskSessionResource - private ComputeTaskSession taskSes; - - /** {@inheritDoc} */ - @Override public String execute() throws IgniteCheckedException { - assert taskSes != null; - - taskSes.saveCheckpoint("testAllCheckpoint", "CheckpointTestState"); - taskSes.loadCheckpoint("testAllCheckpoint"); - taskSes.removeCheckpoint("testAllCheckpoint"); - - return "GridAllCheckpointEventsSuccess-test-all-checkpoint-event-success."; - } - } - - /** - * - */ - @ComputeTaskSessionFullSupport - private static class GridAllEventsTestTask extends ComputeTaskSplitAdapter<Object, Object> { - /** {@inheritDoc} */ - @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { - return Collections.singleton((ComputeJob)arg); - } - - /** {@inheritDoc} */ - @Override public Serializable reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - assert results != null; - assert results.size() == 1; - - return (Serializable)results; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridEventStorageRuntimeConfigurationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridEventStorageRuntimeConfigurationSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridEventStorageRuntimeConfigurationSelfTest.java deleted file mode 100644 index c458b66..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridEventStorageRuntimeConfigurationSelfTest.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; -import org.junit.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * Tests for runtime events configuration. - */ -public class GridEventStorageRuntimeConfigurationSelfTest extends GridCommonAbstractTest { - /** */ - private int[] inclEvtTypes; - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setIncludeEventTypes(inclEvtTypes); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testEnableWithDefaults() throws Exception { - inclEvtTypes = null; - - try { - Ignite g = startGrid(); - - final AtomicInteger cnt = new AtomicInteger(); - - g.events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - cnt.incrementAndGet(); - - return true; - } - }, EVT_TASK_STARTED); - - g.compute().run(F.noop()); - - assertEquals(0, cnt.get()); - - g.events().enableLocal(EVT_TASK_STARTED); - - g.compute().run(F.noop()); - - assertEquals(1, cnt.get()); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testEnableWithIncludes() throws Exception { - inclEvtTypes = new int[] { EVT_TASK_STARTED, EVT_TASK_FINISHED }; - - try { - Ignite g = startGrid(); - - final AtomicInteger cnt = new AtomicInteger(); - - g.events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - cnt.incrementAndGet(); - - return true; - } - }, EVT_TASK_STARTED, EVT_TASK_FINISHED, EVT_JOB_STARTED); - - g.compute().run(F.noop()); - - assertEquals(2, cnt.get()); - - g.events().enableLocal(EVT_TASK_FINISHED, EVT_JOB_STARTED); - - g.compute().run(F.noop()); - - assertEquals(5, cnt.get()); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testDisableWithIncludes() throws Exception { - inclEvtTypes = null; - - try { - Ignite g = startGrid(); - - g.events().enableLocal(EVT_TASK_STARTED, EVT_TASK_FINISHED, EVT_JOB_STARTED); - - final AtomicInteger cnt = new AtomicInteger(); - - g.events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - cnt.incrementAndGet(); - - return true; - } - }, EVT_TASK_STARTED, EVT_TASK_FINISHED, EVT_JOB_STARTED); - - g.compute().run(F.noop()); - - assertEquals(3, cnt.get()); - - g.events().disableLocal(EVT_TASK_STARTED, EVT_TASK_FINISHED, EVT_JOB_FAILED); - - g.compute().run(F.noop()); - - assertEquals(4, cnt.get()); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testEnableDisable() throws Exception { - inclEvtTypes = null; - - try { - Ignite g = startGrid(); - - IgniteEvents evts = g.events(); - - evts.enableLocal(EVT_CACHE_OBJECT_PUT); - - evts.disableLocal(EVT_CACHE_OBJECT_PUT); - - for (int evtType : evts.enabledEvents()) - assertFalse(evtType == EVT_CACHE_OBJECT_PUT); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings("UnusedDeclaration") - public void testInvalidTypes() throws Exception { - inclEvtTypes = new int[]{EVT_TASK_STARTED}; - - try (Ignite g = startGrid()) { - assertTrue(g.events().isEnabled(EVT_TASK_STARTED)); - - try { - g.events().isEnabled(-13); - - fail("Expected IgniteCheckedException"); - } - catch (IllegalArgumentException e) { - info("Caught expected exception: " + e); - } - } - finally { - stopAllGrids(); - } - - inclEvtTypes = new int[]{-13}; - - try (Ignite g = startGrid()) { - fail("Expected IgniteCheckedException"); - } - catch (IgniteCheckedException e) { - info("Caught expected exception: " + e); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testGetters() throws Exception { - inclEvtTypes = new int[]{EVT_TASK_STARTED, EVT_TASK_FINISHED, 30000}; - - try { - Ignite g = startGrid(); - - assertEqualsWithoutOrder(inclEvtTypes, getEnabledEvents(g)); - assertEqualsWithoutOrder(inclEvtTypes, getEnabledEvents(1013, g, 30000)); - - g.events().enableLocal(20000, EVT_TASK_STARTED, EVT_CACHE_ENTRY_CREATED); - - assertEqualsWithoutOrder( - new int[] {EVT_TASK_STARTED, EVT_TASK_FINISHED, EVT_CACHE_ENTRY_CREATED, 20000, 30000}, - getEnabledEvents(g)); - - assertEqualsWithoutOrder( - new int[] {EVT_TASK_STARTED, EVT_TASK_FINISHED, EVT_CACHE_ENTRY_CREATED, 20000, 30000}, - getEnabledEvents(1013, g, 20000, 30000)); - - g.events().disableLocal(20000, 20001, 30000, EVT_TASK_STARTED, EVT_CACHE_ENTRY_CREATED); - - assertEqualsWithoutOrder( - new int[] {EVT_TASK_FINISHED, EVT_TASK_STARTED, 30000}, - getEnabledEvents(g)); - - assertEqualsWithoutOrder( - new int[] {EVT_TASK_FINISHED, EVT_TASK_STARTED, 30000}, - getEnabledEvents(1013, g, 20000, 30000)); - - int[] a = new int[1013]; - - for (int i = 0; i < 1000; i++) - a[i] = 1001 + i; - - a[1000] = EVT_TASK_TIMEDOUT; - a[1001] = EVT_TASK_STARTED; - - randomShuffle(a, 1002); - - int[] a0 = Arrays.copyOf(a, a.length + 1); - - g.events().enableLocal(Arrays.copyOf(a, 1002)); - - a0[1002] = EVT_TASK_FINISHED; - a0[1003] = 30000; - - assertEqualsWithoutOrder(Arrays.copyOf(a0, 1004), getEnabledEvents(g)); - assertEqualsWithoutOrder(Arrays.copyOf(a0, 1004), getEnabledEvents(2013, g, 30000)); - - g.events().disableLocal(Arrays.copyOf(a, 1002)); - - assertEqualsWithoutOrder( - new int[] {EVT_TASK_STARTED, EVT_TASK_FINISHED, 30000}, - getEnabledEvents(g)); - - assertEqualsWithoutOrder( - new int[] {EVT_TASK_STARTED, EVT_TASK_FINISHED, 30000}, - getEnabledEvents(1013, g, 20000, 30000)); - } - finally { - stopAllGrids(); - } - } - - /** - * @param a Array. - * @param len Prefix length. - */ - private void randomShuffle(int[] a, int len) { - Random rand = new Random(); - - for (int i = len - 1; i > 0; i--) { - int j = rand.nextInt(i); - - int t = a[i]; - a[i] = a[j]; - a[j] = t; - } - } - - /** - * @param a First array. - * @param b Second array. - */ - private void assertEqualsWithoutOrder(int[] a, int[] b) { - assertNotNull(a); - assertNotNull(b); - assertEquals(a.length, b.length); - - int[] a0 = Arrays.copyOf(a, a.length); - int[] b0 = Arrays.copyOf(a, a.length); - - Arrays.sort(a0); - Arrays.sort(b0); - - Assert.assertArrayEquals(a0, b0); - } - - /** - * @param g Grid. - * @return Enabled events. - */ - private int[] getEnabledEvents(Ignite g) { - return g.events().enabledEvents(); - } - - /** - * @param limit Loop limit. - * @param g Grid. - * @param customTypes Array of event types. - * @return Enabled events counted with loop (1..limit) and checks of custom types. - */ - private int[] getEnabledEvents(int limit, Ignite g, int... customTypes) { - Collection<Integer> res = new HashSet<>(); - - IgniteEvents evts = g.events(); - - for (int i = 1; i <= limit; i++) { - if (evts.isEnabled(i)) - res.add(i); - } - - if (customTypes != null) { - for (int i : customTypes) - if (evts.isEnabled(i)) - res.add(i); - } - - return U.toIntArray(res); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridEventStorageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridEventStorageSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridEventStorageSelfTest.java deleted file mode 100644 index be605de..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridEventStorageSelfTest.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * Event storage tests. - * - * Note: - * Test based on events generated by test task execution. - * Filter class must be static because it will be send to remote host in - * serialized form. - */ -@GridCommonTest(group = "Kernal Self") -public class GridEventStorageSelfTest extends GridCommonAbstractTest { - /** First grid. */ - private static Ignite ignite1; - - /** Second grid. */ - private static Ignite ignite2; - - /** */ - public GridEventStorageSelfTest() { - super(/*start grid*/false); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - ignite1 = startGrid(1); - ignite2 = startGrid(2); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - } - - /** - * @throws Exception In case of error. - */ - public void testAddRemoveGlobalListener() throws Exception { - IgnitePredicate<IgniteEvent> lsnr = new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - info("Received local event: " + evt); - - return true; - } - }; - - ignite1.events().localListen(lsnr, EVTS_ALL_MINUS_METRIC_UPDATE); - - assert ignite1.events().stopLocalListen(lsnr); - } - - /** - * @throws Exception In case of error. - */ - public void testAddRemoveDiscoListener() throws Exception { - IgnitePredicate<IgniteEvent> lsnr = new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - info("Received local event: " + evt); - - return true; - } - }; - - ignite1.events().localListen(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); - - assert ignite1.events().stopLocalListen(lsnr); - assert !ignite1.events().stopLocalListen(lsnr); - } - - /** - * @throws Exception In case of error. - */ - public void testLocalNodeEventStorage() throws Exception { - TestEventListener lsnr = new TestEventListener(); - - IgnitePredicate<IgniteEvent> filter = new TestEventFilter(); - - // Check that two same listeners may be added. - ignite1.events().localListen(lsnr, EVT_TASK_STARTED); - ignite1.events().localListen(lsnr, EVT_TASK_STARTED); - - // Execute task. - generateEvents(ignite1); - - assert lsnr.getCounter() == 1; - - Collection<IgniteEvent> evts = ignite1.events().localQuery(filter); - - assert evts != null; - assert evts.size() == 1; - - // Execute task. - generateEvents(ignite1); - - // Check that listener has been removed. - assert lsnr.getCounter() == 2; - - // Check that no problems with nonexistent listeners. - assert ignite1.events().stopLocalListen(lsnr); - assert !ignite1.events().stopLocalListen(lsnr); - - // Check for events from local node. - evts = ignite1.events().localQuery(filter); - - assert evts != null; - assert evts.size() == 2; - - // Check for events from empty remote nodes collection. - try { - events(ignite1.cluster().forPredicate(F.<ClusterNode>alwaysFalse())).remoteQuery(filter, 0); - } - catch (ClusterGroupEmptyException ignored) { - // No-op - } - } - - /** - * @throws Exception In case of error. - */ - public void testRemoteNodeEventStorage() throws Exception { - IgnitePredicate<IgniteEvent> filter = new TestEventFilter(); - - generateEvents(ignite2); - - ClusterGroup prj = ignite1.cluster().forPredicate(F.remoteNodes(ignite1.cluster().localNode().id())); - - Collection<IgniteEvent> evts = events(prj).remoteQuery(filter, 0); - - assert evts != null; - assert evts.size() == 1; - } - - /** - * @throws Exception In case of error. - */ - public void testRemoteAndLocalNodeEventStorage() throws Exception { - IgnitePredicate<IgniteEvent> filter = new TestEventFilter(); - - generateEvents(ignite1); - - Collection<IgniteEvent> evts = ignite1.events().remoteQuery(filter, 0); - Collection<IgniteEvent> locEvts = ignite1.events().localQuery(filter); - Collection<IgniteEvent> remEvts = - events(ignite1.cluster().forPredicate(F.remoteNodes(ignite1.cluster().localNode().id()))).remoteQuery(filter, 0); - - assert evts != null; - assert locEvts != null; - assert remEvts != null; - assert evts.size() == 1; - assert locEvts.size() == 1; - assert remEvts.isEmpty(); - } - - /** - * Create events in grid. - * - * @param ignite Grid. - * @throws IgniteCheckedException In case of error. - */ - private void generateEvents(Ignite ignite) throws IgniteCheckedException { - ignite.compute().localDeployTask(GridEventTestTask.class, GridEventTestTask.class.getClassLoader()); - - ignite.compute().execute(GridEventTestTask.class.getName(), null); - } - - /** - * Test task. - */ - private static class GridEventTestTask extends ComputeTaskSplitAdapter<Object, Object> { - /** {@inheritDoc} */ - @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { - return Collections.singleton(new GridEventTestJob()); - } - - /** {@inheritDoc} */ - @Override public Serializable reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - assert results != null; - assert results.size() == 1; - - return results.get(0).getData(); - } - } - - /** - * Test job. - */ - private static class GridEventTestJob extends ComputeJobAdapter { - /** {@inheritDoc} */ - @Override public String execute() throws IgniteCheckedException { - return "GridEventTestJob-test-event."; - } - } - - /** - * Test event listener. - */ - private class TestEventListener implements IgnitePredicate<IgniteEvent> { - /** Event counter. */ - private AtomicInteger cnt = new AtomicInteger(); - - /** {@inheritDoc} */ - @Override public boolean apply(IgniteEvent evt) { - info("Event storage event: evt=" + evt); - - // Count only started tasks. - if (evt.type() == EVT_TASK_STARTED) - cnt.incrementAndGet(); - - return true; - } - - /** - * @return Event counter value. - */ - public int getCounter() { - return cnt.get(); - } - - /** - * Clear event counter. - */ - public void clearCounter() { - cnt.set(0); - } - } - - /** - * Test event filter. - */ - private static class TestEventFilter implements IgnitePredicate<IgniteEvent> { - /** {@inheritDoc} */ - @Override public boolean apply(IgniteEvent evt) { - // Accept only predefined TASK_STARTED events. - return evt.type() == EVT_TASK_STARTED; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/gridgain/grid/kernal/GridExecutorServiceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/GridExecutorServiceTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/GridExecutorServiceTest.java deleted file mode 100644 index 007ea36..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/GridExecutorServiceTest.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.internal.executor.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Grid distributed executor test. - */ -@GridCommonTest(group = "Thread Tests") -public class GridExecutorServiceTest extends GridCommonAbstractTest { - /** */ - public GridExecutorServiceTest() { - super(true); - } - - /** - * @throws Exception Thrown in case of test failure. - */ - public void testExecute() throws Exception { - Ignite ignite = G.ignite(getTestGridName()); - - ExecutorService srvc = createExecutorService(ignite); - - srvc.execute(new Runnable() { - @IgniteInstanceResource - private Ignite ignite; - - @Override public void run() { - System.out.println("Test message."); - - assert this.ignite != null; - } - }); - - srvc.execute(new TestRunnable()); - - srvc.shutdown(); - } - - /** - * @throws Exception Thrown in case of test failure. - */ - public void testSubmit() throws Exception { - Ignite ignite = G.ignite(getTestGridName()); - - ExecutorService srvc = createExecutorService(ignite); - - Future<?> fut = srvc.submit(new TestRunnable()); - - Object res = fut.get(); - - info("Default Runnable result:" + res); - - assert res == null : "Failed to get valid default result for submitted Runnable: " + res; - - String val = "test-value"; - - fut = srvc.submit(new TestRunnable(), val); - - res = fut.get(); - - info("Defined Runnable result:" + res); - - assert val.equals(res) : "Failed to get valid predefined result for submitted Runnable: " + res; - - fut = srvc.submit(new TestCallable<>(val)); - - res = fut.get(); - - info("Callable result:" + res); - - assert val.equals(res) : "Failed to get valid result for submitted Callable: " + res; - - srvc.shutdown(); - } - - /** - * @throws Exception Thrown in case of test failure. - */ - public void testSubmitWithFutureTimeout() throws Exception { - Ignite ignite = G.ignite(getTestGridName()); - - ExecutorService srvc = createExecutorService(ignite); - - Future<Integer> fut = srvc.submit(new TestCallable<>(3000)); // Just sleep for 3 seconds. - - boolean ok = true; - - try { - fut.get(1, TimeUnit.SECONDS); - - ok = false; - } - catch (TimeoutException e) { - info("Task timeout elapsed: " + e.getMessage()); - } - - assert ok : "Timeout must be thrown."; - - srvc.shutdown(); - } - - /** - * @throws Exception Thrown in case of test failure. - */ - @SuppressWarnings("TooBroadScope") - public void testInvokeAll() throws Exception { - Ignite ignite = G.ignite(getTestGridName()); - - ExecutorService srvc = createExecutorService(ignite); - - Collection<Callable<String>> cmds = new ArrayList<>(2); - - String val1 = "test-value-1"; - String val2 = "test-value-2"; - - cmds.add(new TestCallable<>(val1)); - cmds.add(new TestCallable<>(val2)); - - List<Future<String>> futs = srvc.invokeAll(cmds); - - assert futs != null; - assert futs.size() == 2; - - String res1 = futs.get(0).get(); - String res2 = futs.get(1).get(); - - assert val1.equals(res1) : "Failed to get valid result for first command: " + res1; - assert val2.equals(res2) : "Failed to get valid result for second command: " + res2; - - srvc.shutdown(); - } - - /** - * @throws Exception Thrown in case of test failure. - */ - @SuppressWarnings("TooBroadScope") - public void testInvokeAllWithTimeout() throws Exception { - Ignite ignite = G.ignite(getTestGridName()); - - ExecutorService srvc = createExecutorService(ignite); - - Collection<Callable<Integer>> cmds = new ArrayList<>(); - - cmds.add(new TestCallable<>(3000)); // Just sleeps for 3 seconds. - cmds.add(new TestCallable<>(3000)); // Just sleeps for 3 seconds. - - List<Future<Integer>> fut = srvc.invokeAll(cmds, 1, TimeUnit.SECONDS); - - assert fut != null; - assert fut.size() == 2; - - boolean ok = true; - - try { - fut.get(0).get(); - - ok = false; - } - catch (CancellationException e) { - info("First timeout task is cancelled: " + e.getMessage()); - } - - assert ok : "First task must be cancelled."; - - try { - fut.get(1).get(); - - ok = false; - } - catch (CancellationException e) { - info("Second timeout task is cancelled: " + e.getMessage()); - } - - assert ok : "Second task must be cancelled."; - - srvc.shutdown(); - } - - /** - * @throws Exception Thrown in case of test failure. - */ - @SuppressWarnings("TooBroadScope") - public void testInvokeAny() throws Exception { - Ignite ignite = G.ignite(getTestGridName()); - - ExecutorService srvc = createExecutorService(ignite); - - Collection<Callable<String>> cmds = new ArrayList<>(2); - - String val1 = "test-value-1"; - String val2 = "test-value-2"; - - cmds.add(new TestCallable<>(val1)); - cmds.add(new TestCallable<>(val2)); - - String res = srvc.invokeAny(cmds); - - info("Result: " + res); - - assert val1.equals(res) : "Failed to get valid result: " + res; - - srvc.shutdown(); - } - - /** - * @throws Exception Thrown in case of test failure. - */ - @SuppressWarnings("TooBroadScope") - public void testInvokeAnyWithTimeout() throws Exception { - Ignite ignite = G.ignite(getTestGridName()); - - ExecutorService srvc = createExecutorService(ignite); - - Collection<Callable<Integer>> timeoutCmds = new ArrayList<>(2); - - timeoutCmds.add(new TestCallable<>(5000)); - timeoutCmds.add(new TestCallable<>(5000)); - - boolean ok = true; - - try { - srvc.invokeAny(timeoutCmds, 1, TimeUnit.SECONDS); - - ok = false; - } - catch (TimeoutException e) { - info("Task timeout elapsed: " + e.getMessage()); - } - - assert ok : "Timeout must be thrown."; - - srvc.shutdown(); - } - - /** - * @param ignite Grid instance. - * @return Thrown in case of test failure. - */ - private ExecutorService createExecutorService(Ignite ignite) { - assert ignite != null; - - return new GridExecutorService((ClusterGroupAdapter) ignite, log()); - } - - /** - * @param <T> Type of the {@link Callable} argument. - */ - private static class TestCallable<T> implements Callable<T>, Serializable { - /** */ - private T data; - - /** */ - @IgniteInstanceResource - private Ignite ignite; - - /** - * @param data Data. - */ - TestCallable(T data) { - this.data = data; - } - - /** {@inheritDoc} */ - @Override public T call() throws Exception { - System.out.println("Test callable message."); - - assert ignite != null; - - if (data instanceof Integer) - Thread.sleep((Integer)data); - - return data; - } - } - - /** */ - private static class TestRunnable implements Runnable, Serializable { - /** */ - @IgniteInstanceResource - private Ignite ignite; - - /** {@inheritDoc} */ - @Override public void run() { - System.out.println("Test Runnable message."); - - assert ignite != null; - } - } -}