http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java new file mode 100644 index 0000000..12b6458 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class DataStreamerMultinodeCreateCacheTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setSocketTimeout(50); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setAckTimeout(50); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testCreateCacheAndStream() throws Exception { + final int THREADS = 5; + + startGrids(THREADS); + + final AtomicInteger idx = new AtomicInteger(); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int threadIdx = idx.getAndIncrement(); + + long stopTime = System.currentTimeMillis() + 60_000; + + Ignite ignite = grid(threadIdx); + + int iter = 0; + + while (System.currentTimeMillis() < stopTime) { + String cacheName = "cache-" + threadIdx + "-" + (iter % 10); + + try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cacheName)) { + try (IgniteDataStreamer<Object, Object> stmr = ignite.dataStreamer(cacheName)) { + ((DataStreamerImpl<Object, Object>)stmr).maxRemapCount(0); + + for (int i = 0; i < 1000; i++) + stmr.addData(i, i); + } + } + + iter++; + } + + return null; + } + }, THREADS, "create-cache"); + + fut.get(2 * 60_000); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java index d983302..9cda1b4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java @@ -85,13 +85,16 @@ public class IgfsClientCacheSelfTest extends IgfsAbstractSelfTest { cfg.setCacheConfiguration(cacheConfiguration(META_CACHE_NAME), cacheConfiguration(DATA_CACHE_NAME), cacheConfiguration(CACHE_NAME)); - if (!gridName.equals(getTestGridName(0))) - cfg.setClientMode(true); - TcpDiscoverySpi disco = new TcpDiscoverySpi(); disco.setIpFinder(IP_FINDER); + if (!gridName.equals(getTestGridName(0))) { + cfg.setClientMode(true); + + disco.setForceServerMode(true); + } + cfg.setDiscoverySpi(disco); FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); @@ -114,11 +117,12 @@ public class IgfsClientCacheSelfTest extends IgfsAbstractSelfTest { cacheCfg.setName(cacheName); + cacheCfg.setNearConfiguration(null); + if (META_CACHE_NAME.equals(cacheName)) cacheCfg.setCacheMode(REPLICATED); else { cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setNearConfiguration(null); cacheCfg.setBackups(0); cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCommonAbstractTest.java index fa7f048..e879130 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCommonAbstractTest.java @@ -46,16 +46,6 @@ public class IgfsCommonAbstractTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(IgniteTestResources rsrcs) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(rsrcs); - - cfg.setPeerClassLoadingEnabled(false); - cfg.setLocalHost("127.0.0.1"); - - return cfg; - } - - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName, IgniteTestResources rsrcs) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName, rsrcs); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsOneClientNodeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsOneClientNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsOneClientNodeTest.java index 49ddb03..3498cd9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsOneClientNodeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsOneClientNodeTest.java @@ -52,11 +52,9 @@ public class IgfsOneClientNodeTest extends GridCommonAbstractTest { cfg.setClientMode(true); - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - cfg.setDiscoverySpi(discoSpi); + cfg.setDiscoverySpi(new TcpDiscoverySpi() + .setForceServerMode(true) + .setIpFinder(new TcpDiscoveryVmIpFinder(true))); FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java index 761f00f..faccc9a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java @@ -53,11 +53,7 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { cfg.setMarshaller(new OptimizedMarshaller(false)); - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(discoSpi); + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder).setForceServerMode(true)); cfg.setCacheConfiguration(); @@ -88,6 +84,8 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { srvNames.add(getTestGridName(i)); for (int i = 0 ; i < NODES_CNT; i++) { + log.info("Iteration: " + i); + Ignite ignite = grid(i); Collection<String> res = ignite.compute().broadcast(new IgniteCallable<String>() { @@ -113,6 +111,8 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { */ public void testClientClosure() throws Exception { for (int i = 0 ; i < NODES_CNT; i++) { + log.info("Iteration: " + i); + Ignite ignite = grid(i); Collection<String> res = ignite.compute(ignite.cluster().forClients()). @@ -138,6 +138,8 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { */ public void testCustomClosure() throws Exception { for (int i = 0 ; i < NODES_CNT; i++) { + log.info("Iteration: " + i); + Ignite ignite = grid(i); Collection<String> res = ignite.compute(ignite.cluster().forPredicate(F.<ClusterNode>alwaysTrue())). @@ -161,6 +163,8 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { UUID clientNodeId = grid(0).cluster().localNode().id(); for (int i = 0 ; i < NODES_CNT; i++) { + log.info("Iteration: " + i); + Ignite ignite = grid(i); ignite.services().deployNodeSingleton(SINGLETON_NAME, new TestService()); @@ -194,6 +198,8 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { UUID clientNodeId = grid(0).cluster().localNode().id(); for (int i = 0 ; i < NODES_CNT; i++) { + log.info("Iteration: " + i); + Ignite ignite = grid(i); ignite.services(ignite.cluster().forClients()).deployNodeSingleton(SINGLETON_NAME, new TestService()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java new file mode 100644 index 0000000..404c32b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; + +/** + * + */ +public class GridServiceClientNodeTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODE_CNT = 3; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + if (gridName.equals(getTestGridName(NODE_CNT - 1))) + cfg.setClientMode(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(NODE_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testDeployFromClient() throws Exception { + Ignite ignite = ignite(NODE_CNT - 1); + + assertTrue(ignite.configuration().isClientMode()); + + String svcName = "testService"; + + CountDownLatch latch = new CountDownLatch(1); + + DummyService.exeLatch(svcName, latch); + + ignite.services().deployClusterSingleton(svcName, new DummyService()); + + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferSelfTest.java new file mode 100644 index 0000000..cbf7d89 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferSelfTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import junit.framework.TestCase; + +import java.nio.*; +import java.util.*; + +/** + * Tests for {@link GridNioDelimitedBuffer}. + */ +public class GridNioDelimitedBufferSelfTest extends TestCase { + /** */ + private static final String ASCII = "ASCII"; + + /** + * Tests simple delimiter (excluded from alphabet) + */ + public void testReadZString() throws Exception { + Random rnd = new Random(); + + int buffSize = 0; + + byte[] delim = new byte[] {0}; + + List<String> strs = new ArrayList<>(50); + + for (int i = 0; i < 50; i++) { + int len = rnd.nextInt(128) + 1; + + buffSize += len + delim.length; + + StringBuilder sb = new StringBuilder(len); + + for (int j = 0; j < len; j++) + sb.append((char)(rnd.nextInt(26) + 'a')); + + + strs.add(sb.toString()); + } + + ByteBuffer buff = ByteBuffer.allocate(buffSize); + + for (String str : strs) { + buff.put(str.getBytes(ASCII)); + buff.put(delim); + } + + buff.flip(); + + byte[] msg; + + GridNioDelimitedBuffer delimBuff = new GridNioDelimitedBuffer(delim); + + List<String> res = new ArrayList<>(strs.size()); + + while ((msg = delimBuff.read(buff)) != null) + res.add(new String(msg, ASCII)); + + assertEquals(strs, res); + } + + /** + * Tests compound delimiter (included to alphabet) + */ + public void testDelim() throws Exception { + byte[] delim = "aabb".getBytes(ASCII); + + List<String> strs = Arrays.asList("za", "zaa", "zaab", "zab", "zaabaababbbbabaab"); + + int buffSize = 0; + + for (String str : strs) + buffSize += str.length() + delim.length; + + ByteBuffer buff = ByteBuffer.allocate(buffSize); + + for (String str : strs) { + buff.put(str.getBytes(ASCII)); + buff.put(delim); + } + + buff.flip(); + + byte[] msg; + + GridNioDelimitedBuffer delimBuff = new GridNioDelimitedBuffer(delim); + + List<String> res = new ArrayList<>(strs.size()); + + while ((msg = delimBuff.read(buff)) != null) + res.add(new String(msg, ASCII)); + + assertEquals(strs, res); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java index e3baeb0..bdf9929 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java @@ -1286,7 +1286,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { } /** - * Test client to use instead of {@link GridTcpCommunicationClient} + * Test client to use instead of {@link GridTcpNioCommunicationClient} */ private static class TestClient implements AutoCloseable { /** Socket implementation to use. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java index d106f5b..0030ce1 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java @@ -65,7 +65,10 @@ public class GridCacheMultiNodeLoadTest extends GridCommonAbstractTest { cacheCfg.setStartSize(10); cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - cacheCfg.setEvictionPolicy(new LruEvictionPolicy(100000)); + LruEvictionPolicy plc = new LruEvictionPolicy(); + plc.setMaxSize(100000); + + cacheCfg.setEvictionPolicy(plc); cacheCfg.setBackups(1); cacheCfg.setRebalanceMode(SYNC); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java index 6960fa1..0d9ec8f 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java @@ -70,7 +70,11 @@ public class GridCachePartitionedAtomicLongLoadTest extends GridCommonAbstractTe cc.setStartSize(200); cc.setRebalanceMode(CacheRebalanceMode.SYNC); cc.setWriteSynchronizationMode(FULL_SYNC); - cc.setEvictionPolicy(new LruEvictionPolicy<>(1000)); + + LruEvictionPolicy plc = new LruEvictionPolicy(); + plc.setMaxSize(1000); + + cc.setEvictionPolicy(plc); cc.setBackups(1); cc.setAffinity(new RendezvousAffinityFunction(true)); cc.setEvictSynchronized(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 0b0f099..77d3905 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -54,11 +54,13 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { new GridCacheMvccManager(), new GridCacheDeploymentManager<K, V>(), new GridCachePartitionExchangeManager<K, V>(), - new GridCacheIoManager() + new GridCacheIoManager(), + null ), defaultCacheConfiguration(), CacheType.USER, true, + true, new GridCacheEventManager(), new GridCacheSwapManager(false), new CacheOsStoreManager(null, new CacheConfiguration()), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/loadtests/swap/GridSwapEvictAllBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/swap/GridSwapEvictAllBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/swap/GridSwapEvictAllBenchmark.java index 1561b77..62066da 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/swap/GridSwapEvictAllBenchmark.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/swap/GridSwapEvictAllBenchmark.java @@ -266,7 +266,11 @@ public class GridSwapEvictAllBenchmark { ccfg.setSwapEnabled(true); ccfg.setEvictSynchronized(false); - ccfg.setEvictionPolicy(new FifoEvictionPolicy(EVICT_PLC_SIZE)); + + FifoEvictionPolicy plc = new FifoEvictionPolicy(); + plc.setMaxSize(EVICT_PLC_SIZE); + + ccfg.setEvictionPolicy(plc); if (store != null) { ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java index 07fd9e3..35abf7e 100644 --- a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java @@ -113,7 +113,7 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest cache = true; - Ignite ignite = startGrid(2); // Check can start on more cache node. + Ignite ignite = startGrid(2); // Check can start one more cache node. assertNotNull(ignite.cache(null)); } @@ -122,7 +122,7 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest * @throws Exception If failed. */ public void testRestartAllNodes() throws Exception { - cache = false; + cache = true; String home = U.getIgniteHome(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingNoPeerClassLoadingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingNoPeerClassLoadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingNoPeerClassLoadingSelfTest.java index 610ce64..37d2ec7 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingNoPeerClassLoadingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingNoPeerClassLoadingSelfTest.java @@ -31,6 +31,9 @@ import java.util.concurrent.atomic.*; * peer class loading. */ public class GridMessagingNoPeerClassLoadingSelfTest extends GridMessagingSelfTest { + /** */ + private static CountDownLatch rcvLatch; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -56,9 +59,9 @@ public class GridMessagingNoPeerClassLoadingSelfTest extends GridMessagingSelfTe final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable - final CountDownLatch rcvLatch = new CountDownLatch(1); + rcvLatch = new CountDownLatch(1); - ignite2.message().remoteListen("", new P2<UUID, Object>() { + ignite2.message().remoteListen(null, new P2<UUID, Object>() { @Override public boolean apply(UUID nodeId, Object msg) { try { log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java index c033750..b7838be 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java @@ -45,7 +45,7 @@ import static org.apache.ignite.testframework.GridTestUtils.*; /** * Various tests for Messaging public API. */ -public class GridMessagingSelfTest extends GridCommonAbstractTest { +public class GridMessagingSelfTest extends GridCommonAbstractTest implements Serializable { /** */ private static final String MSG_1 = "MSG-1"; @@ -74,7 +74,10 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest { public static final String EXT_RESOURCE_CLS_NAME = "org.apache.ignite.tests.p2p.TestUserResource"; /** Shared IP finder. */ - private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + private final transient TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + protected static CountDownLatch rcvLatch; /** * A test message topic. @@ -609,7 +612,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest { public void testRemoteListen() throws Exception { final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>(); - final CountDownLatch rcvLatch = new CountDownLatch(4); + rcvLatch = new CountDownLatch(4); ignite2.message().remoteListen(null, new P2<UUID, Object>() { @Override public boolean apply(UUID nodeId, Object msg) { @@ -746,7 +749,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest { final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable - final CountDownLatch rcvLatch = new CountDownLatch(3); + rcvLatch = new CountDownLatch(3); ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() { @Override public boolean apply(UUID nodeId, Object msg) { @@ -795,7 +798,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest { final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable - final CountDownLatch rcvLatch = new CountDownLatch(3); + rcvLatch = new CountDownLatch(3); ignite2.message().remoteListen(I_TOPIC_1, new P2<UUID, Object>() { @IgniteInstanceResource http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java new file mode 100644 index 0000000..09abcdb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.messaging; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class IgniteMessagingWithClientTest extends GridCommonAbstractTest implements Serializable { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Message topic. */ + private enum TOPIC { + /** */ + ORDERED + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new OptimizedMarshaller(false)); + + if (gridName.equals(getTestGridName(2))) { + cfg.setClientMode(true); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); + } + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testMessageSendWithClientJoin() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-996"); + + startGrid(0); + + Ignite ignite1 = startGrid(1); + + ClusterGroup rmts = ignite1.cluster().forRemotes(); + + IgniteMessaging msg = ignite1.message(rmts); + + msg.localListen(TOPIC.ORDERED, new LocalListener()); + + msg.remoteListen(TOPIC.ORDERED, new RemoteListener()); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int iter = 0; + + while (!stop.get()) { + if (iter % 10 == 0) + log.info("Client start/stop iteration: " + iter); + + iter++; + + try (Ignite ignite = startGrid(2)) { + assertTrue(ignite.configuration().isClientMode()); + } + } + + return null; + } + }, 1, "client-start-stop"); + + try { + long stopTime = U.currentTimeMillis() + 30_000; + + int iter = 0; + + while (System.currentTimeMillis() < stopTime) { + try { + ignite1.message(rmts).sendOrdered(TOPIC.ORDERED, Integer.toString(iter), 0); + } + catch (IgniteException e) { + log.info("Message send failed: " + e); + } + + iter++; + + if (iter % 100 == 0) + Thread.sleep(5); + } + } + finally { + stop.set(true); + } + + fut.get(); + } + + /** + * + */ + private static class LocalListener implements IgniteBiPredicate<UUID, String> { + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, String s) { + return true; + } + } + + /** + * + */ + private static class RemoteListener implements IgniteBiPredicate<UUID, String> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean apply(UUID nodeId, String msg) { + ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.ORDERED, msg); + + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java index ea51aff..8d27485 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java @@ -64,7 +64,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica // Test idle clients remove. for (CommunicationSpi spi : spis.values()) { - ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients"); + ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients"); assertEquals(2, clients.size()); @@ -77,7 +77,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica super.afterTest(); for (CommunicationSpi spi : spis.values()) { - ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients"); + ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients"); for (int i = 0; i < 20 && !clients.isEmpty(); i++) { info("Check failed for SPI [grid=" + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index c038ee7..2d175f5 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -256,7 +256,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic assertTrue(latch.await(10, TimeUnit.SECONDS)); for (CommunicationSpi spi : spis) { - ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients"); + ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients"); assertEquals(1, clients.size()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java index 34fa610..c4a0916 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java @@ -32,8 +32,6 @@ public class GridTcpCommunicationSpiConfigSelfTest extends GridSpiAbstractConfig checkNegativeSpiProperty(new TcpCommunicationSpi(), "localPort", 65636); checkNegativeSpiProperty(new TcpCommunicationSpi(), "localPortRange", -1); checkNegativeSpiProperty(new TcpCommunicationSpi(), "idleConnectionTimeout", 0); - checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectionBufferSize", -1); - checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectionBufferFlushFrequency", 0); checkNegativeSpiProperty(new TcpCommunicationSpi(), "socketReceiveBuffer", -1); checkNegativeSpiProperty(new TcpCommunicationSpi(), "socketSendBuffer", -1); checkNegativeSpiProperty(new TcpCommunicationSpi(), "messageQueueLimit", -1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java index e7ae957..3916f02 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java @@ -501,7 +501,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac } for (CommunicationSpi spi : spis.values()) { - final ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients"); + final ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients"); assert GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java index 3c61f00..61bb944 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java @@ -22,9 +22,11 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.spi.*; +import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.config.*; import org.apache.ignite.testframework.junits.*; import org.apache.ignite.testframework.junits.spi.*; +import org.jetbrains.annotations.*; import javax.management.*; import java.io.*; @@ -132,7 +134,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri /** {@inheritDoc} */ @Override public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, - Map<Long, Collection<ClusterNode>> topHist, Serializable data) { + Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage data) { if (type == EVT_NODE_METRICS_UPDATED) isMetricsUpdate = true; } @@ -205,7 +207,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri DiscoverySpiListener locHeartbeatLsnr = new DiscoverySpiListener() { @Override public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist, - Serializable data) { + @Nullable DiscoverySpiCustomMessage data) { // If METRICS_UPDATED came from local node if (type == EVT_NODE_METRICS_UPDATED && node.id().equals(spi.getLocalNode().id())) @@ -266,9 +268,8 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri Collection<UUID> nodeIds = new HashSet<>(); - for (IgniteTestResources rsrc : spiRsrcs) { + for (IgniteTestResources rsrc : spiRsrcs) nodeIds.add(rsrc.getNodeId()); - } for (ClusterNode node : spi.getRemoteNodes()) { if (nodeIds.contains(node.id())) { @@ -369,7 +370,8 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri spi.setListener(new DiscoverySpiListener() { @SuppressWarnings({"NakedNotify"}) @Override public void onDiscovery(int type, long topVer, ClusterNode node, - Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist, Serializable data) { + Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist, + @Nullable DiscoverySpiCustomMessage data) { info("Discovery event [type=" + type + ", node=" + node + ']'); synchronized (mux) { @@ -388,6 +390,10 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri } }); + GridSpiTestContext ctx = initSpiContext(); + + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "spiCtx", ctx); + spi.spiStart(getTestGridName() + i); spis.add(spi); @@ -395,7 +401,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri spiRsrcs.add(rsrcMgr); // Force to use test context instead of default dummy context. - spi.onContextInitialized(initSpiContext()); + spi.onContextInitialized(ctx); } } catch (Throwable e) { @@ -436,9 +442,8 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri spi.spiStop(); } - for (IgniteTestResources rscrs : spiRsrcs) { + for (IgniteTestResources rscrs : spiRsrcs) rscrs.stopThreads(); - } // Clear. spis.clear(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoveryMarshallerCheckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoveryMarshallerCheckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoveryMarshallerCheckSelfTest.java new file mode 100644 index 0000000..016854a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoveryMarshallerCheckSelfTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +/** + * Test for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}. + */ +public class TcpClientDiscoveryMarshallerCheckSelfTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (gridName.endsWith("0")) + cfg.setMarshaller(new JdkMarshaller()); + else { + cfg.setClientMode(true); + + cfg.setMarshaller(new OptimizedMarshaller()); + } + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testMarshallerInConsistency() throws Exception { + startGrid(0); + + try { + startGrid(1); + + fail("Expected SPI exception was not thrown."); + } + catch (IgniteCheckedException e) { + Throwable ex = e.getCause().getCause(); + + assertTrue(ex instanceof IgniteSpiException); + assertTrue(ex.getMessage().contains("Local node's marshaller differs from remote node's marshaller")); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java deleted file mode 100644 index 0c9f2f2..0000000 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java +++ /dev/null @@ -1,700 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.net.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.events.EventType.*; - -/** - * Client-based discovery tests. - */ -public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final AtomicInteger srvIdx = new AtomicInteger(); - - /** */ - private static final AtomicInteger clientIdx = new AtomicInteger(); - - /** */ - private static Collection<UUID> srvNodeIds; - - /** */ - private static Collection<UUID> clientNodeIds; - - /** */ - private static int clientsPerSrv; - - /** */ - private static CountDownLatch srvJoinedLatch; - - /** */ - private static CountDownLatch srvLeftLatch; - - /** */ - private static CountDownLatch srvFailedLatch; - - /** */ - private static CountDownLatch clientJoinedLatch; - - /** */ - private static CountDownLatch clientLeftLatch; - - /** */ - private static CountDownLatch clientFailedLatch; - - /** */ - private static CountDownLatch msgLatch; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setLocalHost("127.0.0.1"); - - if (gridName.startsWith("server")) { - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(disco); - } - else if (gridName.startsWith("client")) { - TcpClientDiscoverySpi disco = new TcpClientDiscoverySpi(); - - TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); - - String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()). - get((clientIdx.get() - 1) / clientsPerSrv).toString(); - - if (addr.startsWith("/")) - addr = addr.substring(1); - - ipFinder.setAddresses(Arrays.asList(addr)); - - disco.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(disco); - } - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses(); - - if (!F.isEmpty(addrs)) - IP_FINDER.unregisterAddresses(addrs); - - srvIdx.set(0); - clientIdx.set(0); - - srvNodeIds = new GridConcurrentHashSet<>(); - clientNodeIds = new GridConcurrentHashSet<>(); - - clientsPerSrv = 2; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllClients(true); - stopAllServers(true); - - assert G.allGrids().isEmpty(); - } - - /** - * @throws Exception If failed. - */ - public void testClientNodeJoin() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - srvJoinedLatch = new CountDownLatch(3); - clientJoinedLatch = new CountDownLatch(3); - - attachListeners(3, 3); - - startClientNodes(1); - - await(srvJoinedLatch); - await(clientJoinedLatch); - - checkNodes(3, 4); - } - - /** - * @throws Exception If failed. - */ - public void testClientNodeLeave() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - srvLeftLatch = new CountDownLatch(3); - clientLeftLatch = new CountDownLatch(2); - - attachListeners(3, 3); - - stopGrid("client-2"); - - await(srvLeftLatch); - await(clientLeftLatch); - - checkNodes(3, 2); - } - - /** - * @throws Exception If failed. - */ - public void testClientNodeFail() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - srvFailedLatch = new CountDownLatch(3); - clientFailedLatch = new CountDownLatch(2); - - attachListeners(3, 3); - - failClient(2); - - await(srvFailedLatch); - await(clientFailedLatch); - - checkNodes(3, 2); - } - - /** - * @throws Exception If failed. - */ - public void testServerNodeJoin() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - srvJoinedLatch = new CountDownLatch(3); - clientJoinedLatch = new CountDownLatch(3); - - attachListeners(3, 3); - - startServerNodes(1); - - await(srvJoinedLatch); - await(clientJoinedLatch); - - checkNodes(4, 3); - } - - /** - * @throws Exception If failed. - */ - public void testServerNodeLeave() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - srvLeftLatch = new CountDownLatch(2); - clientLeftLatch = new CountDownLatch(3); - - attachListeners(3, 3); - - stopGrid("server-2"); - - await(srvLeftLatch); - await(clientLeftLatch); - - checkNodes(2, 3); - } - - /** - * @throws Exception If failed. - */ - public void testServerNodeFail() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - srvFailedLatch = new CountDownLatch(2); - clientFailedLatch = new CountDownLatch(3); - - attachListeners(3, 3); - - assert U.<Map>field(G.ignite("server-2").configuration().getDiscoverySpi(), "clientMsgWorkers").isEmpty(); - - failServer(2); - - await(srvFailedLatch); - await(clientFailedLatch); - - checkNodes(2, 3); - } - - /** - * TODO: IGNITE-587. - * @throws Exception If failed. - */ - public void testClientReconnect() throws Exception { - fail("ignite-587"); - - clientsPerSrv = 1; - - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - resetClientIpFinder(2); - - srvFailedLatch = new CountDownLatch(2); - clientFailedLatch = new CountDownLatch(3); - - attachListeners(2, 3); - - failServer(2); - - await(srvFailedLatch); - await(clientFailedLatch); - - checkNodes(2, 3); - } - - /** - * @throws Exception If failed. - */ - public void testClientNodeJoinOneServer() throws Exception { - startServerNodes(1); - - srvJoinedLatch = new CountDownLatch(1); - - attachListeners(1, 0); - - startClientNodes(1); - - await(srvJoinedLatch); - - checkNodes(1, 1); - } - - /** - * TODO: IGNITE-587. - * @throws Exception If failed. - */ - public void testClientNodeLeaveOneServer() throws Exception { - fail("ignite-587"); - - startServerNodes(1); - startClientNodes(1); - - checkNodes(1, 1); - - srvLeftLatch = new CountDownLatch(1); - - attachListeners(1, 0); - - stopGrid("client-0"); - - await(srvLeftLatch); - - checkNodes(1, 0); - } - - /** - * @throws Exception If failed. - */ - public void testClientNodeFailOneServer() throws Exception { - startServerNodes(1); - startClientNodes(1); - - checkNodes(1, 1); - - srvFailedLatch = new CountDownLatch(1); - - attachListeners(1, 0); - - failClient(0); - - await(srvFailedLatch); - - checkNodes(1, 0); - } - - /** - * @throws Exception If failed. - */ - public void testMetrics() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - attachListeners(3, 3); - - assertTrue(checkMetrics(3, 3, 0)); - - G.ignite("client-0").compute().broadcast(F.noop()); - - assertTrue(GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return checkMetrics(3, 3, 1); - } - }, 10000)); - - checkMetrics(3, 3, 1); - - G.ignite("server-0").compute().broadcast(F.noop()); - - assertTrue(GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return checkMetrics(3, 3, 2); - } - }, 10000)); - } - - /** - * @param srvCnt Number of Number of server nodes. - * @param clientCnt Number of client nodes. - * @param execJobsCnt Expected number of executed jobs. - * @return Whether metrics are correct. - */ - private boolean checkMetrics(int srvCnt, int clientCnt, int execJobsCnt) { - for (int i = 0; i < srvCnt; i++) { - Ignite g = G.ignite("server-" + i); - - for (ClusterNode n : g.cluster().nodes()) { - if (n.metrics().getTotalExecutedJobs() != execJobsCnt) - return false; - } - } - - for (int i = 0; i < clientCnt; i++) { - Ignite g = G.ignite("client-" + i); - - for (ClusterNode n : g.cluster().nodes()) { - if (n.metrics().getTotalExecutedJobs() != execJobsCnt) - return false; - } - } - - return true; - } - - /** - * @throws Exception If failed. - */ - public void testDataExchangeFromServer() throws Exception { - testDataExchange("server-0"); - } - - /** - * TODO: IGNITE-587. - * - * @throws Exception If failed. - */ - public void testDataExchangeFromClient() throws Exception { - fail("ignite-587"); - - testDataExchange("client-0"); - } - - /** - * @throws Exception If failed. - */ - private void testDataExchange(String masterName) throws Exception { - startServerNodes(2); - startClientNodes(2); - - checkNodes(2, 2); - - IgniteMessaging msg = grid(masterName).message(); - - UUID id = null; - - try { - id = msg.remoteListen(null, new MessageListener()); - - msgLatch = new CountDownLatch(4); - - msg.send(null, "Message 1"); - - await(msgLatch); - - startServerNodes(1); - startClientNodes(1); - - checkNodes(3, 3); - - msgLatch = new CountDownLatch(6); - - msg.send(null, "Message 2"); - - await(msgLatch); - } - finally { - if (id != null) - msg.stopRemoteListen(id); - } - } - - /** - * @param idx Index. - * @throws Exception In case of error. - */ - private void resetClientIpFinder(int idx) throws Exception { - TcpClientDiscoverySpi disco = - (TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi(); - - TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder(); - - String addr = IP_FINDER.getRegisteredAddresses().iterator().next().toString(); - - if (addr.startsWith("/")) - addr = addr.substring(1); - - ipFinder.setAddresses(Arrays.asList(addr)); - } - - /** - * @param cnt Number of nodes. - * @throws Exception In case of error. - */ - private void startServerNodes(int cnt) throws Exception { - for (int i = 0; i < cnt; i++) { - Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); - - srvNodeIds.add(g.cluster().localNode().id()); - } - } - - /** - * @param cnt Number of nodes. - * @throws Exception In case of error. - */ - private void startClientNodes(int cnt) throws Exception { - for (int i = 0; i < cnt; i++) { - Ignite g = startGrid("client-" + clientIdx.getAndIncrement()); - - clientNodeIds.add(g.cluster().localNode().id()); - } - } - - /** - * @param idx Index. - */ - private void failServer(int idx) { - ((TcpDiscoverySpi)G.ignite("server-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure(); - } - - /** - * @param idx Index. - */ - private void failClient(int idx) { - ((TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure(); - } - - /** - * @param srvCnt Number of server nodes. - * @param clientCnt Number of client nodes. - */ - private void attachListeners(int srvCnt, int clientCnt) throws Exception { - if (srvJoinedLatch != null) { - for (int i = 0; i < srvCnt; i++) { - G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - info("Joined event fired on server: " + evt); - - srvJoinedLatch.countDown(); - - return true; - } - }, EVT_NODE_JOINED); - } - } - - if (srvLeftLatch != null) { - for (int i = 0; i < srvCnt; i++) { - G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - info("Left event fired on server: " + evt); - - srvLeftLatch.countDown(); - - return true; - } - }, EVT_NODE_LEFT); - } - } - - if (srvFailedLatch != null) { - for (int i = 0; i < srvCnt; i++) { - G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - info("Failed event fired on server: " + evt); - - srvFailedLatch.countDown(); - - return true; - } - }, EVT_NODE_FAILED); - } - } - - if (clientJoinedLatch != null) { - for (int i = 0; i < clientCnt; i++) { - G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - info("Joined event fired on client: " + evt); - - clientJoinedLatch.countDown(); - - return true; - } - }, EVT_NODE_JOINED); - } - } - - if (clientLeftLatch != null) { - for (int i = 0; i < clientCnt; i++) { - G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - info("Left event fired on client: " + evt); - - clientLeftLatch.countDown(); - - return true; - } - }, EVT_NODE_LEFT); - } - } - - if (clientFailedLatch != null) { - for (int i = 0; i < clientCnt; i++) { - G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - info("Failed event fired on client: " + evt); - - clientFailedLatch.countDown(); - - return true; - } - }, EVT_NODE_FAILED); - } - } - } - - /** - * @param srvCnt Number of server nodes. - * @param clientCnt Number of client nodes. - */ - private void checkNodes(int srvCnt, int clientCnt) { - for (int i = 0; i < srvCnt; i++) { - Ignite g = G.ignite("server-" + i); - - assertTrue(srvNodeIds.contains(g.cluster().localNode().id())); - - assertFalse(g.cluster().localNode().isClient()); - - checkRemoteNodes(g, srvCnt + clientCnt - 1); - } - - for (int i = 0; i < clientCnt; i++) { - Ignite g = G.ignite("client-" + i); - - assertTrue(clientNodeIds.contains(g.cluster().localNode().id())); - - assertTrue(g.cluster().localNode().isClient()); - - checkRemoteNodes(g, srvCnt + clientCnt - 1); - } - } - - /** - * @param ignite Grid. - * @param expCnt Expected nodes count. - */ - @SuppressWarnings("TypeMayBeWeakened") - private void checkRemoteNodes(Ignite ignite, int expCnt) { - Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes(); - - assertEquals(expCnt, nodes.size()); - - for (ClusterNode node : nodes) { - UUID id = node.id(); - - if (clientNodeIds.contains(id)) - assertTrue(node.isClient()); - else if (srvNodeIds.contains(id)) - assertFalse(node.isClient()); - else - assert false : "Unexpected node ID: " + id; - } - } - - /** - * @param latch Latch. - * @throws InterruptedException If interrupted. - */ - private void await(CountDownLatch latch) throws InterruptedException { - assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS)); - } - - /** - */ - private static class MessageListener implements IgniteBiPredicate<UUID, Object> { - @IgniteInstanceResource - private Ignite ignite; - - /** {@inheritDoc} */ - @Override public boolean apply(UUID uuid, Object msg) { - X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']'); - - msgLatch.countDown(); - - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java new file mode 100644 index 0000000..d1b6232 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * + */ +public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest { + /** */ + private boolean forceSrv; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost("127.0.0.1"); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(new TcpDiscoveryMulticastIpFinder()); + + if (getTestGridName(1).equals(gridName)) { + cfg.setClientMode(true); + + spi.setForceServerMode(forceSrv); + } + + cfg.setDiscoverySpi(spi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + /** + * @throws Exception If failed. + */ + public void testJoinWithMulticast() throws Exception { + joinWithMulticast(); + } + + /** + * @throws Exception If failed. + */ + public void testJoinWithMulticastForceServer() throws Exception { + forceSrv = true; + + joinWithMulticast(); + } + + /** + * @throws Exception If failed. + */ + private void joinWithMulticast() throws Exception { + Ignite ignite0 = startGrid(0); + + assertSpi(ignite0, false); + + Ignite ignite1 = startGrid(1); + + assertSpi(ignite1, !forceSrv); + + assertTrue(ignite1.configuration().isClientMode()); + + assertEquals(2, ignite0.cluster().nodes().size()); + assertEquals(2, ignite1.cluster().nodes().size()); + + Ignite ignite2 = startGrid(2); + + assertSpi(ignite2, false); + + assertEquals(3, ignite0.cluster().nodes().size()); + assertEquals(3, ignite1.cluster().nodes().size()); + assertEquals(3, ignite2.cluster().nodes().size()); + } + + /** + * @param ignite Ignite. + * @param client Expected client mode flag. + */ + private void assertSpi(Ignite ignite, boolean client) { + DiscoverySpi spi = ignite.configuration().getDiscoverySpi(); + + assertSame(TcpDiscoverySpi.class, spi.getClass()); + + TcpDiscoverySpi spi0 = (TcpDiscoverySpi)spi; + + assertSame(TcpDiscoveryMulticastIpFinder.class, spi0.getIpFinder().getClass()); + + assertEquals(client, spi0.isClientMode()); + + Collection<Object> addrSnds = GridTestUtils.getFieldValue(spi0.getIpFinder(), "addrSnds"); + + assertNotNull(addrSnds); + + if (client) + assertTrue(addrSnds.isEmpty()); // Check client does not send its address. + else + assertFalse(addrSnds.isEmpty()); + } +}