http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheSingleNodeLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheSingleNodeLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheSingleNodeLoadTest.java new file mode 100644 index 0000000..d034593 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheSingleNodeLoadTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.lru.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.thread.*; +import org.apache.ignite.spi.collision.fifoqueue.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + */ +public class GridCacheSingleNodeLoadTest { + /** Thread count. */ + private static final int THREADS = 200; + + /** + * @param args Arguments. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + start(); + + try { + runTest(200, THREADS); + + runTest(1000, THREADS); + } + finally { + stop(); + } + } + + /** + * @param putCnt Number of puts per thread. + * @param userThreads Number of user threads. + * @throws Exception If failed. + */ + private static void runTest(final int putCnt, int userThreads) throws Exception { + final AtomicInteger keyGen = new AtomicInteger(); + + final AtomicLong totalTime = new AtomicLong(); + + final AtomicInteger txCntr = new AtomicInteger(); + + X.println("Starting multithread test with thread count: " + userThreads); + + long start = System.currentTimeMillis(); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + GridCache<Integer, Student> cache = G.ignite().cache(null); + + assert cache != null; + + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < putCnt; i++) { + cache.putx(keyGen.incrementAndGet(), new Student()); + + int cnt = txCntr.incrementAndGet(); + + if (cnt % 5000 == 0) + X.println("Processed transactions: " + cnt); + } + + totalTime.addAndGet(System.currentTimeMillis() - startTime); + + return null; + } + }, userThreads, "load-worker"); + + long time = System.currentTimeMillis() - start; + + X.println("Average tx/sec: " + (txCntr.get() * 1000 / time)); + X.println("Average commit time (ms): " + (totalTime.get() / txCntr.get())); + } + + /** + * @throws Exception If failed. + */ + private static void start() throws Exception { + IgniteConfiguration c = new IgniteConfiguration(); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + c.setDiscoverySpi(disco); + + FifoQueueCollisionSpi cols = new FifoQueueCollisionSpi(); + + cols.setParallelJobsNumber(Integer.MAX_VALUE); + + c.setCollisionSpi(cols); + + c.setExecutorService(new IgniteThreadPoolExecutor(THREADS / 2, THREADS / 2, 0L, new LinkedBlockingQueue<Runnable>())); + c.setSystemExecutorService(new IgniteThreadPoolExecutor(THREADS * 2, THREADS * 2, 0L, + new LinkedBlockingQueue<Runnable>())); + + CacheConfiguration cc = new CacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setBackups(1); + cc.setNearEvictionPolicy(new GridCacheLruEvictionPolicy(10000)); + cc.setEvictionPolicy(new GridCacheLruEvictionPolicy(300000)); + cc.setSwapEnabled(false); + cc.setDistributionMode(PARTITIONED_ONLY); + + c.setCacheConfiguration(cc); + + G.start(c); + } + + /** + * Stop grid. + */ + private static void stop() { + G.stop(true); + } + + /** + * Entity class for test. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static class Student { + /** */ + private final UUID id; + + /** + * Constructor. + */ + Student() { + id = UUID.randomUUID(); + } + + /** + * @return Id. + */ + public UUID id() { + return id; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Student.class, this); + } + } + + /** + * Ensure singleton. + */ + private GridCacheSingleNodeLoadTest() { + // No-op. + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheSwapLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheSwapLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheSwapLoadTest.java new file mode 100644 index 0000000..abc5ce0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheSwapLoadTest.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Cache+swap load test. + */ +public class GridCacheSwapLoadTest { + /** */ + private static final int LOG_MOD = 10000; + + /** */ + private static final int DFLT_KEY_CNT = 100000; + + /** */ + private static final float DFLT_GET_REMOVE_RATIO = 0.2f; + + /** */ + private static final int DFLT_PUT_THREAD_CNT = 5; + + /** */ + private static final int DFLT_GET_THREAD_CNT = 2; + + /** */ + private static final int DFLT_REMOVE_THREAD_CNT = 2; + + /** */ + private static final boolean DFLT_GET_REMOVE_ENABLED = true; + + /** */ + private static int keyCnt = DFLT_KEY_CNT; + + /** */ + private static float getRmvRatio = DFLT_GET_REMOVE_RATIO; + + /** */ + private static int putThreadCnt = DFLT_PUT_THREAD_CNT; + + /** */ + private static int getThreadCnt = DFLT_GET_THREAD_CNT; + + /** */ + private static int rmvThreadCnt = DFLT_REMOVE_THREAD_CNT; + + /** */ + private static boolean getRmvEnabled = DFLT_GET_REMOVE_ENABLED; + + /** */ + private static final CountDownLatch getRemoveStartedLatch = new CountDownLatch(1); + + /** */ + private static final BlockingQueue<Integer> swappedKeys = new LinkedBlockingQueue<>(); + + /** */ + private GridCacheSwapLoadTest() { + // No-op + } + + /** + * @param args Command line arguments. + * @throws IgniteCheckedException In case of error. + */ + public static void main(String[] args) throws IgniteCheckedException { + parseArgs(args); + + try (Ignite g = G.start("modules/core/src/test/config/spring-cache-swap.xml")) { + g.events().localListen(new IgnitePredicate<IgniteEvent>() { + private final AtomicInteger cnt = new AtomicInteger(0); + + private final AtomicBoolean getRmvStartedGuard = new AtomicBoolean(false); + + @Override public boolean apply(IgniteEvent evt) { + int cnt = this.cnt.incrementAndGet(); + + if (cnt % LOG_MOD == 0) + X.println(">>> Swap count: " + cnt); + + if (getRmvEnabled) { + IgniteCacheEvent ce = (IgniteCacheEvent) evt; + + Integer key = ce.key(); + + swappedKeys.add(key); + + if (swappedKeys.size() > keyCnt * getRmvRatio && + getRmvStartedGuard.compareAndSet(false, true)) { + getRemoveStartedLatch.countDown(); + + X.println(">>> Started get/remove."); + } + } + + return true; + } + }, EVT_CACHE_OBJECT_SWAPPED); + + Collection<IgniteFuture<?>> futs = new ArrayList<>(3); + + long start = System.currentTimeMillis(); + + futs.add(doPut(g)); + + if (getRmvEnabled) + futs.addAll(doGetRemove(g)); + + wait(futs); + + X.println("Test finished in: " + (System.currentTimeMillis() - start)); + } + } + + /** + * @param args Command line arguments. + */ + private static void parseArgs(String[] args) { + try { + for (int i = 0; i < args.length; i++) { + String arg = args[i]; + + switch (arg) { + case "-k": + keyCnt = Integer.valueOf(args[++i]); break; + case "-r": + getRmvRatio = Float.valueOf(args[++i]); break; + case "-pt": + putThreadCnt = Integer.valueOf(args[++i]); break; + case "-gt": + getThreadCnt = Integer.valueOf(args[++i]); break; + case "-rt": + rmvThreadCnt = Integer.valueOf(args[++i]); break; + case "-dgr": + getRmvEnabled = false; break; + default: + usage(); + } + } + } + catch (Exception e) { + e.printStackTrace(); + + usage(); + } + + X.println(">>>"); + X.println(">>> Key count: " + keyCnt); + X.println(">>> Get/remove ratio: " + getRmvRatio); + X.println(">>> Put threads count: " + putThreadCnt); + X.println(">>> Get threads count: " + getThreadCnt); + X.println(">>> Remove threads count: " + rmvThreadCnt); + X.println(">>> Get/remove " + (getRmvEnabled ? "enabled" : "disabled") + "."); + X.println(">>>"); + } + + /** */ + private static void usage() { + X.println(">>>"); + X.println(">>> Usage: swaploadtest.sh -k <number of keys> -r <get/remove ratio> -pt <number of put threads>"); + X.println(">>> -gt <number of get threads> -rt <number of remove threads> -dgr"); + X.println(">>>"); + X.println(">>> -dgr disables get/remove threads."); + X.println(">>>"); + X.println(">>> All arguments are optional."); + X.println(">>>"); + + System.exit(1); + } + + /** + * @return Future. + */ + private static IgniteFuture<?> doPut(final Ignite g) { + final AtomicInteger putKey = new AtomicInteger(0); + + return GridTestUtils.runMultiThreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + GridCache<Integer, Integer> cache = g.cache(null); + + assert cache != null; + + while (true) { + int i = putKey.incrementAndGet(); + + if (i % LOG_MOD == 0) + X.println(">>> Put count: " + i); + + if (i > keyCnt) + break; + + cache.putx(i, i); + } + + X.println(">>> Thread '" + Thread.currentThread().getName() + "' stopped."); + } + }, putThreadCnt, "put-thread"); + } + + /** + * @return Futures. + */ + private static Collection<IgniteFuture<Long>> doGetRemove(final Ignite g) { + final AtomicBoolean stop = new AtomicBoolean(false); + + return F.asList( + GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + getRemoveStartedLatch.await(); + + GridCache<Integer, Integer> cache = g.cache(null); + + assert cache != null; + + while (true) { + Integer i = swappedKeys.take(); + + if (i == null) + continue; + + Integer val = cache.get(i); + + assert val != null && val.equals(i); + + if (i % LOG_MOD == 0) + X.println(">>> Get/remove count: " + i); + + if (i == keyCnt || stop.get()) { + stop.set(true); + + break; + } + } + + X.println(">>> Thread '" + Thread.currentThread().getName() + "' stopped."); + + return null; + } + }, getThreadCnt, "get-thread"), + + GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + getRemoveStartedLatch.await(); + + GridCache<Integer, Integer> cache = g.cache(null); + + assert cache != null; + + while (true) { + Integer i = swappedKeys.take(); + + Integer val = cache.remove(i); + + assert val != null && val.equals(i); + + if (i % LOG_MOD == 0) + X.println(">>> Get/remove count: " + i); + + if (i == keyCnt || stop.get()) { + stop.set(true); + + break; + } + } + + X.println(">>> Thread '" + Thread.currentThread().getName() + "' stopped."); + + return null; + } + }, rmvThreadCnt, "remove-thread") + ); + } + + /** + * @param futs Futures. + */ + private static void wait(Iterable<IgniteFuture<?>> futs) { + F.forEach(futs, new CIX1<IgniteFuture<?>>() { + @Override public void applyx(IgniteFuture<?> fut) throws IgniteCheckedException { + fut.get(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheWriteBehindStoreLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheWriteBehindStoreLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheWriteBehindStoreLoadTest.java new file mode 100644 index 0000000..9887317 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheWriteBehindStoreLoadTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Basic store test. + */ +public class GridCacheWriteBehindStoreLoadTest extends GridCommonAbstractTest { + /** Flush frequency. */ + private static final int WRITE_FROM_BEHIND_FLUSH_FREQUENCY = 1000; + + /** Run time is 24 hours. */ + private static final long runTime = 24L * 60 * 60 * 60 * 1000; + + /** Specify if test keys should be randomly generated. */ + private boolean rndKeys; + + /** Number of distinct keys if they are generated randomly. */ + private int keysCnt = 20 * 1024; + + /** Number of threads that concurrently update cache. */ + private int threadCnt; + + /** No-op cache store. */ + private static final CacheStore store = new CacheStoreAdapter() { + /** {@inheritDoc} */ + @Override public Object load(Object key) { + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry e) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + // No-op. + } + }; + + /** + * Constructor + */ + public GridCacheWriteBehindStoreLoadTest() { + super(true /*start grid. */); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + GridCache<?, ?> cache = cache(); + + if (cache != null) + cache.clearAll(); + } + + /** + * @return Caching mode. + */ + protected GridCacheMode cacheMode() { + return GridCacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override protected final IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration c = super.getConfiguration(); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(cacheMode()); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setSwapEnabled(false); + + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + cc.setWriteBehindEnabled(true); + cc.setWriteBehindFlushFrequency(WRITE_FROM_BEHIND_FLUSH_FREQUENCY); + + c.setCacheConfiguration(cc); + + return c; + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheSequentialKeys() throws Exception { + rndKeys = false; + + threadCnt = 10; + + loadCache(); + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheRandomKeys() throws Exception { + rndKeys = true; + + threadCnt = 10; + + loadCache(); + } + + /** + * @throws Exception If failed. + */ + private void loadCache() throws Exception { + final AtomicBoolean running = new AtomicBoolean(true); + + final GridCache<Long, String> cache = cache(); + + final AtomicLong keyCntr = new AtomicLong(); + + long start = System.currentTimeMillis(); + + IgniteFuture<?> fut = multithreadedAsync(new Runnable() { + @SuppressWarnings({"NullableProblems"}) + @Override public void run() { + + Random rnd = new Random(); + + try { + while (running.get()) { + long putNum = keyCntr.incrementAndGet(); + + long key = rndKeys ? rnd.nextInt(keysCnt) : putNum; + + cache.put(key, "val" + key); + } + } + catch (IgniteCheckedException e) { + error("Unexpected exception in put thread", e); + + assert false; + } + } + }, threadCnt, "put"); + + long prevPutCnt = 0; + + while (System.currentTimeMillis() - start < runTime) { + // Print stats every minute. + U.sleep(60 * 1000); + + long cnt = keyCntr.get(); + long secondsElapsed = (System.currentTimeMillis() - start) / 1000; + + info(">>> Running for " + secondsElapsed + " seconds"); + info(">>> Puts: [total=" + cnt + ", avg=" + (cnt / secondsElapsed) + " (ops/sec), lastMinute=" + + ((cnt - prevPutCnt) / 60) + "(ops/sec)]"); + + prevPutCnt = cnt; + } + + running.set(false); + + fut.get(); + } + + /** + * @return Will return 0 to disable timeout. + */ + @Override protected long getTestTimeout() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/capacity/GridCapacityLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/capacity/GridCapacityLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/capacity/GridCapacityLoadTest.java new file mode 100644 index 0000000..da78a2d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/capacity/GridCapacityLoadTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.capacity; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; +import org.springframework.context.support.*; + +import java.lang.management.*; + +/** + * Continuous mapper load test. + */ +public class GridCapacityLoadTest { + /** Heap usage. */ + private static final MemoryMXBean mem = ManagementFactory.getMemoryMXBean(); + + /** + * Main method. + * + * @param args Parameters. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + // Initialize Spring factory. + ClassPathXmlApplicationContext ctx = + new ClassPathXmlApplicationContext("org/apache/ignite/loadtests/capacity/spring-capacity-cache.xml"); + + IgniteConfiguration cfg = (IgniteConfiguration)ctx.getBean("grid.cfg"); + + try (Ignite g = G.start(cfg)) { + GridCache<Integer, Integer> c = g.cache(null); + + long init = mem.getHeapMemoryUsage().getUsed(); + + printHeap(init); + + int cnt = 0; + + for (; cnt < 3000000; cnt++) { + c.put(cnt, cnt); + + if (cnt % 10000 == 0) { + X.println("Stored count: " + cnt); + + printHeap(init); + + if (cnt > 2100000 && cnt % 100000 == 0) + System.gc(); + } + } + + System.gc(); + + Thread.sleep(1000); + + printHeap(init); + + MemoryUsage heap = mem.getHeapMemoryUsage(); + + long used = heap.getUsed() - init; + + long entrySize = cnt > 0 ? used / cnt : 0; + + X.println("Average entry size: " + entrySize); + } + } + + private static void printHeap(long init) { + MemoryUsage heap = mem.getHeapMemoryUsage(); + + long max = heap.getMax() - init; + long used = heap.getUsed() - init; + long left = max - used; + + X.println("Heap left: " + (left / (1024 * 1024)) + "MB"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/capacity/spring-capacity-cache.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/capacity/spring-capacity-cache.xml b/modules/core/src/test/java/org/apache/ignite/loadtests/capacity/spring-capacity-cache.xml new file mode 100644 index 0000000..1b9bf75 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/capacity/spring-capacity-cache.xml @@ -0,0 +1,124 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!-- + GridGain Spring configuration file to startup grid cache. + + When starting a standalone GridGain node, you need to execute the following command: + {GRIDGAIN_HOME}/bin/ggstart.{bat|sh} path-to-this-file/example-cache.xml + + When starting GridGain from Java IDE, pass path to this file into GridGain: + GridGain.start("path-to-this-file/example-benchmark.xml"); +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> + <!-- + Optional description. + --> + <description> + Spring file for grid configuration with benchmark. + </description> + + <!-- + Configuration below demonstrates how to setup caches within grid nodes. + --> + <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + <property name="deploymentMode" value="SHARED"/> + + <!-- Set to local host address just for examples. --> + <property name="localHost" value="127.0.0.1"/> + + <property name="cacheConfiguration"> + <!-- + Specify list of cache configurations here. Any property from + CacheConfiguration interface can be configured here. + Note that absolutely all configuration properties are optional. + --> + <list> + <!-- + Partitioned cache example configuration. + --> + <bean class="org.apache.ignite.cache.CacheConfiguration"> + <property name="cacheMode" value="PARTITIONED"/> + + <!-- Initial cache size. --> + <property name="startSize" value="10000000"/> + + <!-- + Setting this to true FULL_SYNC will cause local node to wait for remote commits. + --> + <property name="writeSynchronizationMode" value="FULL_ASYNC"/> + + <property name="distributionMode" value="PARTITIONED_ONLY"/> + + <!-- Get rid of value byte buffers once not needed. --> + <property name="storeValueBytes" value="false"/> + + <!-- + This shows how to configure number of backups. The below configuration + sets the number of backups to 0 to ensure maximum capacity.. + --> + <property name="affinity"> + <bean class="org.apache.ignite.cache.affinity.consistenthash.GridCacheConsistentHashAffinityFunction"> + <property name="keyBackups" value="0"/> <!-- No backups. --> + </bean> + </property> + + <!-- Set synchronous preloading (default is asynchronous). --> + <property name="preloadMode" value="SYNC"/> + </bean> + </list> + </property> + + <!-- + Uncomment this to provide TCP discovery SPI (predefined addresses). + Use the addresses list to provide IP addresses of initial nodes in the grid + (at least one address must be provided). + --> + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <property name="addresses"> + <list> + <!-- + List all IP/port configurations that potentially + can be started first in examples. We are assuming + grid of size 10 or less. + --> + <value>127.0.0.1:47500</value> + <value>127.0.0.1:47501</value> + <value>127.0.0.1:47502</value> + <value>127.0.0.1:47503</value> + <value>127.0.0.1:47504</value> + <value>127.0.0.1:47505</value> + <value>127.0.0.1:47506</value> + <value>127.0.0.1:47507</value> + <value>127.0.0.1:47508</value> + <value>127.0.0.1:47509</value> + </list> + </property> + </bean> + </property> + </bean> + </property> + </bean> +</beans> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestCacheStore.java b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestCacheStore.java new file mode 100644 index 0000000..9c75f9a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestCacheStore.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.colocation; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.jdk8.backport.*; + +import javax.cache.*; +import javax.cache.integration.*; +import java.util.concurrent.*; + +/** + * Accenture cache store. + */ +public class GridTestCacheStore extends CacheStoreAdapter<GridTestKey, Long> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** + * Preload data from store. In this case we just auto-generate random values. + * + * @param clo Callback for every key. + * @param args Optional arguments. + */ + @Override public void loadCache(final IgniteBiInClosure<GridTestKey, Long> clo, Object... args) { + // Number of threads is passed in as argument by caller. + final int numThreads = (Integer)args[0]; + int entryCnt = (Integer)args[1]; + + log.info("Number of load threads: " + numThreads); + log.info("Number of cache entries to load: " + entryCnt); + + ExecutorService execSvc = Executors.newFixedThreadPool(numThreads); + + try { + ExecutorCompletionService<Object> completeSvc = new ExecutorCompletionService<>(execSvc); + + GridCache<GridTestKey, Long> cache = ignite.cache("partitioned"); + + assert cache != null; + + // Get projection just to check affinity for Integer. + final GridCacheProjection<Integer, Long> prj = cache.projection(Integer.class, Long.class); + + final LongAdder adder = new LongAdder(); + + for (int i = 0; i < numThreads; i++) { + final int threadId = i; + + final int perThreadKeys = entryCnt / numThreads; + + final int mod = entryCnt % numThreads; + + completeSvc.submit(new Callable<Object>() { + @Override public Object call() throws Exception { + int start = threadId * perThreadKeys; + int end = start + perThreadKeys; + + if (threadId + 1 == numThreads) + end += mod; + + for (long i = start; i < end; i++) { + if (prj.cache().affinity().mapKeyToNode(GridTestKey.affinityKey(i)).isLocal()) { // Only add if key is local. + clo.apply(new GridTestKey(i), i); + + adder.increment(); + } + + if (i % 10000 == 0) + log.info("Loaded " + adder.intValue() + " keys."); + } + + return null; + } + }); + } + + // Wait for threads to complete. + for (int i = 0; i < numThreads; i++) { + try { + completeSvc.take().get(); + } + catch (InterruptedException | ExecutionException e) { + throw new CacheLoaderException(e); + } + } + + // Final print out. + log.info("Loaded " + adder.intValue() + " keys."); + } + finally { + execSvc.shutdown(); + } + } + + /** {@inheritDoc} */ + @Override public Long load(GridTestKey key) { + return null; // No-op. + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends GridTestKey, ? extends Long> e) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestConstants.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestConstants.java b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestConstants.java new file mode 100644 index 0000000..7ca173e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestConstants.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.colocation; + +/** + * Set of constants for this project. + */ +public class GridTestConstants { + /** Number of modulo regions to partition keys into. */ + public static final int MOD_COUNT = 1024; + + /** Number of entries to put in cache. */ + public static final int ENTRY_COUNT = 2000000; + + /** Cache init size - add some padding to avoid resizing. */ + public static final int CACHE_INIT_SIZE = (int)(1.5 * ENTRY_COUNT); + + /** Number of threads to load cache. */ + public static final int LOAD_THREADS = Runtime.getRuntime().availableProcessors() * 2; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestKey.java b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestKey.java new file mode 100644 index 0000000..bb399aa --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestKey.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.colocation; + +import org.apache.ignite.cache.affinity.*; + +import java.io.*; + +/** + * Accenture key. + */ +public class GridTestKey implements Externalizable { + private long id; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridTestKey() { + // No-op. + } + + public GridTestKey(long id) { + this.id = id; + } + + public long getId() { + return id; + } + + @GridCacheAffinityKeyMapped + public int affinityKey() { + return affinityKey(id); + } + + public static int affinityKey(long id) { + return (int)(id % GridTestConstants.MOD_COUNT); + } + + /** + * Implement {@link Externalizable} for faster serialization. This is + * optional and you can simply implement {@link Serializable}. + * + * @param in Input. + * @throws IOException If failed. + */ + @Override public void readExternal(ObjectInput in) throws IOException { + id = in.readLong(); + } + + /** + * Implement {@link Externalizable} for faster serialization. This is + * optional and you can simply implement {@link Serializable}. + * + * @param out Output. + * @throws IOException If failed. + */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(id); + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + + GridTestKey key = (GridTestKey)o; + + return id == key.id; + } + + @Override public int hashCode() { + return (int)(id ^ (id >>> 32)); + } + + @Override public String toString() { + return "AccentureKey [id=" + id + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestLifecycleBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestLifecycleBean.java b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestLifecycleBean.java new file mode 100644 index 0000000..bd011f4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestLifecycleBean.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.colocation; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.resources.*; + +/** + * Lifecycle bean. + */ +public class GridTestLifecycleBean implements LifecycleBean { + @IgniteInstanceResource + private Ignite g; + + @Override public void onLifecycleEvent(LifecycleEventType type) throws IgniteCheckedException { + if (type == LifecycleEventType.AFTER_GRID_START) { + GridCache<GridTestKey, Long> cache = g.cache("partitioned"); + + assert cache != null; + + cache.loadCache(null, 0, GridTestConstants.LOAD_THREADS, GridTestConstants.ENTRY_COUNT); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java new file mode 100644 index 0000000..4d8539c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.colocation; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.thread.*; +import org.apache.ignite.internal.util.typedef.*; +import org.springframework.beans.factory.*; +import org.springframework.context.support.*; + +import java.util.concurrent.*; + +/** + * Accenture collocated example. + */ +public class GridTestMain { + /** + * Main method. + * + * @param args Parameters. + * @throws IgniteCheckedException If failed. + */ + public static void main(String[] args) throws Exception { + BeanFactory ctx = new ClassPathXmlApplicationContext("org/apache/ignite/loadtests/colocation/spring-colocation.xml"); + + // Initialize Spring factory. + try (Ignite g = G.start((IgniteConfiguration)ctx.getBean("grid.cfg"))) { + final GridCache<GridTestKey, Long> cache = g.cache("partitioned"); + + assert cache != null; + + // Uncomment if you plan to load cache using AccentureCacheStore. + // generateAndLoad(); + + // Uncomment if you plan to load cache from cache store. + // Note that you could also do this automatically from lifecycle bean. + // To configure lifecycle bean, uncomment 'lifecycleBeans' property in + // spring-accenture.xml file. + loadFromStore(cache); + + X.println("Number of entries in cache: " + cache.size()); + + colocateJobs(); + //localPoolRun(); + } + } + + /** + * @throws Exception If failed. + */ + private static void colocateJobs() throws Exception { + X.println("Collocating jobs..."); + + Ignite g = G.ignite(); + + final GridCache<GridTestKey, Long> cache = g.cache("partitioned"); + + final BlockingQueue<IgniteFuture> q = new ArrayBlockingQueue<>(400); + + long start = System.currentTimeMillis(); + + IgniteCompute comp = g.compute().enableAsync(); + + // Collocate computations and data. + for (long i = 0; i < GridTestConstants.ENTRY_COUNT; i++) { + final long key = i; + + comp.affinityRun("partitioned", GridTestKey.affinityKey(key), new Runnable() { + // This code will execute on remote nodes by collocating keys with cached data. + @Override public void run() { + Long val = cache.peek(new GridTestKey(key)); + + if (val == null || val != key) + throw new RuntimeException("Invalid value found [key=" + key + ", val=" + val + ']'); + } + }); + + final IgniteFuture<?> f = comp.future(); + + q.put(f); + + f.listenAsync(new CI1<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> o) { + q.poll(); + } + }); + + if (i % 10000 == 0) + X.println("Executed jobs: " + i); + } + + long end = System.currentTimeMillis(); + + X.println("Executed " + GridTestConstants.ENTRY_COUNT + " computations in " + (end - start) + "ms."); + } + + /** + * + */ + private static void localPoolRun() { + X.println("Local thread pool run..."); + + ExecutorService exe = new IgniteThreadPoolExecutor(400, 400, 0, new ArrayBlockingQueue<Runnable>(400) { + @Override public boolean offer(Runnable runnable) { + try { + put(runnable); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + + return true; + } + }); + + long start = System.currentTimeMillis(); + + final GridCache<GridTestKey, Long> cache = G.ignite().cache("partitioned"); + + // Collocate computations and data. + for (long i = 0; i < GridTestConstants.ENTRY_COUNT; i++) { + final long key = i; + + exe.submit(new Runnable() { + @Override public void run() { + Long val = cache.peek(new GridTestKey(key)); + + if (val == null || val != key) + throw new RuntimeException("Invalid value found [key=" + key + ", val=" + val + ']'); + } + }); + + if (i % 10000 == 0) + X.println("Executed jobs: " + i); + } + + long end = System.currentTimeMillis(); + + X.println("Executed " + GridTestConstants.ENTRY_COUNT + " computations in " + (end - start) + "ms."); + } + + /** + * Load cache from data store. Also take a look at + * {@link GridTestCacheStore#loadAll} method. + * + * @param cache Cache to load. + * @throws IgniteCheckedException If failed. + */ + private static void loadFromStore(GridCache<GridTestKey, Long> cache) throws IgniteCheckedException { + cache.loadCache(null, 0, GridTestConstants.LOAD_THREADS, GridTestConstants.ENTRY_COUNT); + } + + /** + * Generates and loads data directly through cache API using data loader. + * This method is provided as example and is not called directly because + * data is loaded through {@link GridTestCacheStore} store. + * + * @throws Exception If failed. + */ + private static void generateAndLoad() throws Exception { + int numThreads = Runtime.getRuntime().availableProcessors() * 2; + + ExecutorCompletionService<Object> execSvc = + new ExecutorCompletionService<>(Executors.newFixedThreadPool(numThreads)); + + try (IgniteDataLoader<GridTestKey, Long> ldr = G.ignite().dataLoader("partitioned")) { + for (int i = 0; i < numThreads; i++) { + final int threadId = i; + + final int perThreadKeys = GridTestConstants.ENTRY_COUNT / numThreads; + + execSvc.submit(new Callable<Object>() { + @Override public Object call() throws Exception { + int start = threadId * perThreadKeys; + int end = start + perThreadKeys; + + for (long i = start; i < end; i++) + ldr.addData(new GridTestKey(i), i); + + return null; + } + }); + } + + for (int i = 0; i < numThreads; i++) + execSvc.take().get(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/spring-colocation.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/spring-colocation.xml b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/spring-colocation.xml new file mode 100644 index 0000000..d4b39c5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/spring-colocation.xml @@ -0,0 +1,182 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!-- + GridGain Spring configuration file to startup grid cache. + + When starting a standalone GridGain node, you need to execute the following command: + {GRIDGAIN_HOME}/bin/ggstart.{bat|sh} path-to-this-file/example-cache.xml + + When starting GridGain from Java IDE, pass path to this file into GridGain: + GridGain.start("path-to-this-file/example-benchmark.xml"); +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xsi:schemaLocation="http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd + http://www.springframework.org/schema/util + http://www.springframework.org/schema/util/spring-util.xsd"> +<!-- + Optional description. + --> + <description> + Spring file for grid configuration with benchmark. + </description> + + <!-- + Configuration below demonstrates how to setup caches within grid nodes. + --> + <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + <property name="deploymentMode" value="SHARED"/> + + <!-- Set to local host address just for examples. --> + <property name="localHost" value="127.0.0.1"/> + + <property name="marshalLocalJobs" value="false"/> + + <property name="collisionSpi"> + <bean class="org.apache.ignite.spi.collision.fifoqueue.FifoQueueCollisionSpi"> + <property name="parallelJobsNumber"><util:constant static-field="java.lang.Integer.MAX_VALUE"/></property> + </bean> + </property> + + <!-- + Uncomment to provide custom configuration for executor service. + By default thread pool size is 100. + All threads are pre-started and are available for use. + --> + <property name="executorService"> + <bean class="org.apache.ignite.thread.IgniteThreadPoolExecutor"> + <constructor-arg type="int" value="400"/> + <constructor-arg type="int" value="400"/> + <constructor-arg type="long"> + <util:constant static-field="java.lang.Long.MAX_VALUE"/> + </constructor-arg> + <constructor-arg type="java.util.concurrent.BlockingQueue"> + <bean class="java.util.concurrent.LinkedBlockingQueue"/> + </constructor-arg> + </bean> + </property> + + <!-- + Uncomment to provide custom configuration for System executor service. + By default the thread pool size is 5 which should be good enough. + Threads are not started unless used. + --> + <property name="systemExecutorService"> + <bean class="org.apache.ignite.thread.IgniteThreadPoolExecutor"> + <constructor-arg type="int" value="400"/> + <constructor-arg type="int" value="400"/> + <constructor-arg type="long"> + <util:constant static-field="java.lang.Long.MAX_VALUE"/> + </constructor-arg> + <constructor-arg type="java.util.concurrent.BlockingQueue"> + <bean class="java.util.concurrent.LinkedBlockingQueue"/> + </constructor-arg> + </bean> + </property> + + <!-- + Uncomment if you plan to populate cache form lifecycle bean. + --> + <!-- + <property name="lifecycleBeans"> + <bean class="com.accenture.collocation.AccentureLifecycleBean"/> + </property> + --> + + <property name="cacheConfiguration"> + <!-- + Specify list of cache configurations here. Any property from + CacheConfiguration interface can be configured here. + Note that absolutely all configuration properties are optional. + --> + <list> + <!-- + Partitioned cache example configuration. + --> + <bean class="org.apache.ignite.cache.CacheConfiguration"> + <property name="name" value="partitioned"/> + + <property name="cacheMode" value="PARTITIONED"/> + + <!-- Initial cache size. --> + <property name="startSize"> + <util:constant static-field="org.apache.ignite.loadtests.colocation.GridTestConstants.CACHE_INIT_SIZE"/> + </property> + + <!-- + This shows how to configure number of backups. The below configuration + sets the number of backups to 1 (which is default). + --> + <property name="affinity"> + <bean class="org.apache.ignite.cache.affinity.consistenthash.GridCacheConsistentHashAffinityFunction"> + <property name="keyBackups" value="0"/> <!-- Disable backups. --> + </bean> + </property> + + <!-- Set synchronous preloading (default is asynchronous). --> + <property name="preloadMode" value="SYNC"/> + + <property name="distributionMode" value="PARTITIONED_ONLY"/> + + <property name="queryIndexEnabled" value="false"/> + + <property name="store"> + <bean class="org.apache.ignite.loadtests.colocation.GridTestCacheStore"/> + </property> + </bean> + </list> + </property> + + <!-- + Uncomment this to provide TCP discovery SPI (predefined addresses). + Use the addresses list to provide IP addresses of initial nodes in the grid + (at least one address must be provided). + --> + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <property name="addresses"> + <list> + <!-- + List all IP/port configurations that potentially + can be started first in examples. We are assuming + grid of size 10 or less. + --> + <value>127.0.0.1:47500</value> + <value>127.0.0.1:47501</value> + <value>127.0.0.1:47502</value> + <value>127.0.0.1:47503</value> + <value>127.0.0.1:47504</value> + <value>127.0.0.1:47505</value> + <value>127.0.0.1:47506</value> + <value>127.0.0.1:47507</value> + <value>127.0.0.1:47508</value> + <value>127.0.0.1:47509</value> + </list> + </property> + </bean> + </property> + </bean> + </property> + </bean> +</beans> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java new file mode 100644 index 0000000..b5a2b15 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.communication; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.loadtests.util.*; +import org.gridgain.testframework.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; +import static org.gridgain.testframework.GridLoadTestUtils.*; + +/** + * By default this benchmarks uses original GridGain configuration + * with message dispatching from NIO threads. + * + * By changing {@link #DFLT_CONFIG} constant you can use ForkJoin thread pool instead of JDK default. + * + * Note that you should run 2 processes of this test to get it running. + */ +public class GridIoManagerBenchmark { + /** */ + public static final String DFLT_CONFIG = "modules/tests/config/io-manager-benchmark.xml"; + + /** */ + private static final int DFLT_THREADS = 2; + + /** */ + private static final long WARM_UP_DUR = 30 * 1000; + + /** */ + private static final Semaphore sem = new Semaphore(10 * 1024); + + /** */ + public static final int TEST_TOPIC = 1; + + /** */ + private static final LongAdder msgCntr = new LongAdder(); + + /** */ + private static final Map<IgniteUuid, CountDownLatch> latches = new ConcurrentHashMap8<>(); + + /** */ + private static final byte[][] arrs; + + /** */ + private static boolean testHeavyMsgs; + + /** */ + private static boolean testLatency; + + /** + * + */ + static { + ThreadLocalRandom8 rnd = ThreadLocalRandom8.current(); + + arrs = new byte[64][]; + + for (int i = 0; i < arrs.length; i++) { + byte[] arr = new byte[rnd.nextInt(4096, 8192)]; + + for (int j = 0; j < arr.length; j++) + arr[j] = (byte)rnd.nextInt(0, 127); + + arrs[i] = arr; + } + } + + /** + * @param args Command line arguments. + * @throws IgniteCheckedException If failed. + */ + public static void main(String[] args) throws IgniteCheckedException { + int threads = args.length > 0 ? Integer.parseInt(args[0]) : DFLT_THREADS; + int duration = args.length > 1 ? Integer.parseInt(args[1]) : 0; + String outputFilename = args.length > 2 ? args[2] : null; + String path = args.length > 3 ? args[3] : DFLT_CONFIG; + testHeavyMsgs = args.length > 4 && "true".equalsIgnoreCase(args[4]); + testLatency = args.length > 5 && "true".equalsIgnoreCase(args[5]); + +// threads = 128; +// testLatency = true; +// testHeavyMsgs = true; + + X.println("Config: " + path); + X.println("Test heavy messages: " + testHeavyMsgs); + X.println("Test latency: " + testLatency); + X.println("Threads: " + threads); + X.println("Duration: " + duration); + X.println("Output file name: " + outputFilename); + + GridKernal g = (GridKernal)G.start(path); + + if (g.localNode().order() > 1) { + try { + sendMessages(g, threads, duration, outputFilename); + } + finally { + G.stopAll(false); + } + } + else + receiveMessages(g); + } + + /** + * @param g Kernal. + * @param threads Number of send threads. + * @param duration Test duration. + * @param outputFilename Output file name. + */ + @SuppressWarnings("deprecation") + private static void sendMessages(GridKernal g, int threads, int duration, @Nullable final String outputFilename) { + X.println(">>> Sending messages."); + + g.context().io().addMessageListener(TEST_TOPIC, new SenderMessageListener()); + + Thread collector = startDaemon(new Runnable() { + @Override public void run() { + final long initTs = System.currentTimeMillis(); + long ts = initTs; + long queries = msgCntr.sum(); + GridCumulativeAverage qpsAvg = new GridCumulativeAverage(); + + try { + while (!Thread.currentThread().isInterrupted()) { + U.sleep(10000); + + long newTs = System.currentTimeMillis(); + long newQueries = msgCntr.sum(); + + long executed = newQueries - queries; + long time = newTs - ts; + + long qps = executed * 1000 / time; + + boolean recordAvg = ts - initTs > WARM_UP_DUR; + + if (recordAvg) qpsAvg.update(qps); + + X.println("Communication benchmark [qps=" + qps + (recordAvg ? ", qpsAvg=" + qpsAvg : "") + + ", executed=" + executed + ", time=" + time + ']'); + + ts = newTs; + queries = newQueries; + } + } + catch (IgniteInterruptedException ignored) { + // No-op. + } + + X.println("Average QPS: " + qpsAvg); + + if (outputFilename != null) { + try { + X.println("Saving results to output file: " + outputFilename); + + appendLineToFile(outputFilename, "%s,%d", GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date + ()), qpsAvg.get()); + } + catch (IOException e) { + X.println("Failed to record results to a file: " + e.getMessage()); + } + } + } + }); + + Collection<SendThread> sndThreads = new ArrayList<>(threads); + + for (int i = 0; i < threads; i++) { + SendThread t = new SendThread(g); + + sndThreads.add(t); + + t.start(); + } + + try { + U.sleep(duration > 0 ? duration * 1000 + WARM_UP_DUR : Long.MAX_VALUE); + } + catch (IgniteInterruptedException ignored) { + // No-op. + } + + collector.interrupt(); + + for (SendThread t : sndThreads) + t.interrupt(); + } + + /** + * @param g Kernal. + */ + @SuppressWarnings("deprecation") + private static void receiveMessages(final GridKernal g) { + X.println(">>> Receiving messages."); + + final GridIoManager io = g.context().io(); + + GridMessageListener lsnr = new GridMessageListener() { + private ClusterNode node; + + @Override public void onMessage(UUID nodeId, Object msg) { + if (node == null) + node = g.context().discovery().node(nodeId); + + GridTestMessage testMsg = ((GridTestMessage)msg); + + testMsg.bytes(null); + + try { + io.send(node, TEST_TOPIC, testMsg, PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + } + }; + + io.addMessageListener(TEST_TOPIC, lsnr); + } + + /** + * + */ + private static class SendThread extends Thread { + /** */ + private final GridKernal g; + + /** + * @param g Kernal. + */ + SendThread(GridKernal g) { + this.g = g; + } + + /** {@inheritDoc} */ + @Override public void run() { + try { + ClusterNode dst = awaitOther(g.context().discovery()); + + GridIoManager io = g.context().io(); + + Random rnd = ThreadLocalRandom8.current(); + + IgniteUuid msgId = IgniteUuid.randomUuid(); + + while (!Thread.interrupted()) { + CountDownLatch latch = null; + + if (testLatency) + latches.put(msgId, latch = new CountDownLatch(1)); + else + sem.acquire(); + + io.send( + dst, + TEST_TOPIC, + new GridTestMessage(msgId, testHeavyMsgs ? arrs[rnd.nextInt(arrs.length)] : null), + PUBLIC_POOL); + + if (testLatency && !latch.await(1000, MILLISECONDS)) + throw new RuntimeException("Failed to await latch."); + } + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + catch (InterruptedException ignored) { + // No-op. + } + } + + /** + * @param disc Discovery. + * @return Second node in the topology. + * @throws InterruptedException If interrupted. + */ + @SuppressWarnings("BusyWait") + private ClusterNode awaitOther(final GridDiscoveryManager disc) throws InterruptedException { + while (disc.allNodes().size() < 2) + Thread.sleep(1000); + + for (ClusterNode node : disc.allNodes()) + if (!F.eqNodes(node, disc.localNode())) + return node; + + assert false; + + return null; + } + } + + /** + * + */ + private static class SenderMessageListener implements GridMessageListener { + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + msgCntr.increment(); + + if (testLatency) + latches.get(((GridTestMessage)msg).id()).countDown(); + else + sem.release(); + } + } +}