http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/clients/src/test/java/org/gridgain/loadtests/client/GridClientTcpSslLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/gridgain/loadtests/client/GridClientTcpSslLoadTest.java b/modules/clients/src/test/java/org/gridgain/loadtests/client/GridClientTcpSslLoadTest.java deleted file mode 100644 index 41cf857..0000000 --- a/modules/clients/src/test/java/org/gridgain/loadtests/client/GridClientTcpSslLoadTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.client; - -import org.apache.ignite.client.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -/** - * Makes a long run to ensure stability and absence of memory leaks. - */ -public class GridClientTcpSslLoadTest extends GridClientTcpSslMultiThreadedSelfTest { - /** Test duration. */ - private static final long TEST_RUN_TIME = 8 * 60 * 60 * 1000; - - /** Statistics output interval. */ - private static final long STATISTICS_PRINT_INTERVAL = 5 * 60 * 1000; - - /** Time to let connections closed by idle. */ - private static final long RELAX_INTERVAL = 60 * 1000; - - /** - * @throws Exception If failed. - */ - public void testLongRun() throws Exception { - long start = System.currentTimeMillis(); - - long lastPrint = start; - - do { - clearCaches(); - - testMultithreadedTaskRun(); - - testMultithreadedCachePut(); - - long now = System.currentTimeMillis(); - - if (now - lastPrint > STATISTICS_PRINT_INTERVAL) { - info(">>>>>>> Running test for " + ((now - start) / 1000) + " seconds."); - - lastPrint = now; - } - - // Let idle check work. - U.sleep(RELAX_INTERVAL); - } - while (System.currentTimeMillis() - start < TEST_RUN_TIME); - } - - /** {@inheritDoc} */ - @Override protected int topologyRefreshFrequency() { - return 5000; - } - - /** {@inheritDoc} */ - @Override protected int maxConnectionIdleTime() { - return topologyRefreshFrequency() / 5; - } - - /** - * Clears caches on all nodes. - */ - @SuppressWarnings("ConstantConditions") - private void clearCaches() { - for (int i = 0; i < NODES_CNT; i++) - grid(i).cache(PARTITIONED_CACHE_NAME).clearAll(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/config/io-manager-benchmark.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/io-manager-benchmark.xml b/modules/core/src/test/config/io-manager-benchmark.xml index a2a8574..f08bf59 100644 --- a/modules/core/src/test/config/io-manager-benchmark.xml +++ b/modules/core/src/test/config/io-manager-benchmark.xml @@ -39,7 +39,7 @@ <property name="classNames"> <list> - <value>org.gridgain.loadtests.communication.GridTestMessage</value> + <value>org.apache.ignite.loadtests.communication.GridTestMessage</value> </list> </property> </bean> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/config/jobs-load-base.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/jobs-load-base.xml b/modules/core/src/test/config/jobs-load-base.xml index 18c8e60..a4f0947 100644 --- a/modules/core/src/test/config/jobs-load-base.xml +++ b/modules/core/src/test/config/jobs-load-base.xml @@ -33,7 +33,7 @@ <bean class="org.apache.ignite.marshaller.optimized.IgniteOptimizedMarshaller"> <property name="classNames"> <list> - <value>org.gridgain.loadtests.job.GridJobExecutionLoadTestJob</value> + <value>org.apache.ignite.loadtests.job.GridJobExecutionLoadTestJob</value> </list> </property> </bean> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/config/load/dsi-load-base.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/load/dsi-load-base.xml b/modules/core/src/test/config/load/dsi-load-base.xml index ebde53c..1e5ba3d 100644 --- a/modules/core/src/test/config/load/dsi-load-base.xml +++ b/modules/core/src/test/config/load/dsi-load-base.xml @@ -36,13 +36,13 @@ <bean class="org.apache.ignite.marshaller.optimized.IgniteOptimizedMarshaller"> <property name="classNames"> <list> - <value>org.gridgain.loadtests.dsi.GridDsiSession</value> - <value>org.gridgain.loadtests.dsi.GridDsiPerfJob</value> - <value>org.gridgain.loadtests.dsi.GridDsiRequestTask</value> - <value>org.gridgain.loadtests.dsi.GridDsiResponseTask</value> - <value>org.gridgain.loadtests.dsi.GridDsiRequest</value> - <value>org.gridgain.loadtests.dsi.GridDsiResponse</value> - <value>org.gridgain.loadtests.dsi.GridDsiMessage</value> + <value>org.apache.ignite.loadtests.dsi.GridDsiSession</value> + <value>org.apache.ignite.loadtests.dsi.GridDsiPerfJob</value> + <value>org.apache.ignite.loadtests.dsi.GridDsiRequestTask</value> + <value>org.apache.ignite.loadtests.dsi.GridDsiResponseTask</value> + <value>org.apache.ignite.loadtests.dsi.GridDsiRequest</value> + <value>org.apache.ignite.loadtests.dsi.GridDsiResponse</value> + <value>org.apache.ignite.loadtests.dsi.GridDsiMessage</value> </list> </property> </bean> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/config/load/dsi-load-server.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/load/dsi-load-server.xml b/modules/core/src/test/config/load/dsi-load-server.xml index 2078fda..ad03f19 100644 --- a/modules/core/src/test/config/load/dsi-load-server.xml +++ b/modules/core/src/test/config/load/dsi-load-server.xml @@ -78,6 +78,6 @@ </bean> <util:list id="lifecycleBeans"> - <bean class="org.gridgain.loadtests.dsi.GridDsiLifecycleBean"/> + <bean class="org.apache.ignite.loadtests.dsi.GridDsiLifecycleBean"/> </util:list> </beans> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml b/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml index 2a69b3c..a8fea8c 100644 --- a/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml +++ b/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml @@ -47,7 +47,7 @@ <list> <bean class="org.apache.ignite.streamer.index.tree.StreamerTreeIndexProvider"> <property name="updater"> - <bean class="org.gridgain.loadtests.streamer.IndexUpdater"/> + <bean class="org.apache.ignite.loadtests.streamer.IndexUpdater"/> </property> </bean> </list> @@ -56,7 +56,7 @@ </property> <property name="stages"> <list> - <bean class="org.gridgain.loadtests.streamer.average.TestStage"/> + <bean class="org.apache.ignite.loadtests.streamer.average.TestStage"/> </list> </property> <property name="maximumConcurrentSessions" value="1000"/> @@ -69,11 +69,11 @@ <!-- Load closures. --> - <bean class="org.gridgain.loadtests.streamer.GridStreamerLoad"> + <bean class="org.apache.ignite.loadtests.streamer.GridStreamerLoad"> <property name="closures"> <list> - <bean class="org.gridgain.loadtests.streamer.EventClosure" /> - <bean class="org.gridgain.loadtests.streamer.QueryClosure" /> + <bean class="org.apache.ignite.loadtests.streamer.EventClosure" /> + <bean class="org.apache.ignite.loadtests.streamer.QueryClosure" /> </list> </property> </bean> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheLoadPopulationTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheLoadPopulationTask.java b/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheLoadPopulationTask.java new file mode 100644 index 0000000..a545acb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheLoadPopulationTask.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * + */ +public class GridCacheLoadPopulationTask extends ComputeTaskSplitAdapter<Void, Void> { + /** Serial version UID. */ + private static final long serialVersionUID = 1L; + + /** {@inheritDoc} */ + @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Void arg) throws IgniteCheckedException { + Collection<ChunkPopulationJob> jobs = new ArrayList<>(); + + int maxElements = 10000; + int currStartElement = 0; + + while (currStartElement < GridCacheMultiNodeLoadTest.ELEMENTS_COUNT) { + jobs.add(new ChunkPopulationJob(currStartElement, maxElements)); + + currStartElement += maxElements; + } + + return jobs; + } + + /** + * Chunk population job. + */ + private static class ChunkPopulationJob implements ComputeJob { + /** Serial version UID. */ + private static final long serialVersionUID = 1L; + + /** Start element index. */ + private int startElementIdx; + + /** Mex elements. */ + private int maxElements; + + /** Injected grid. */ + @IgniteInstanceResource + private Ignite g; + + /** + * Creates chunk population job. + * + * @param startElementIdx Start element index. + * @param maxElements Max elements. + */ + ChunkPopulationJob(int startElementIdx, int maxElements) { + this.startElementIdx = startElementIdx; + this.maxElements = maxElements; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "ConstantConditions"}) + @Override public Object execute() throws IgniteCheckedException { + Map<Object, TestValue> map = new TreeMap<>(); + + for (int i = startElementIdx; i < startElementIdx + maxElements; i++) { + if (i >= GridCacheMultiNodeLoadTest.ELEMENTS_COUNT) + break; + + Object key = UUID.randomUUID(); + + map.put(key, new TestValue(key, i)); + } + + g.log().info("Putting values to partitioned cache [nodeId=" + g.cluster().localNode().id() + ", mapSize=" + + map.size() + ']'); + + g.cache(GridCacheMultiNodeLoadTest.CACHE_NAME).putAll(map); + + return null; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + } +} + +/** + * Test value. + */ +@SuppressWarnings("ClassNameDiffersFromFileName") +class TestValue { + /** Value key. */ + private Object key; + + /** Value data. */ + private String someData; + + /** + * Constructs test value. + * + * @param key Key. + * @param id Data. + */ + TestValue(Object key, Object id) { + this.key = key; + someData = key + "_" + id + "_" + System.currentTimeMillis(); + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(TestValue.class, this); + } + + /** + * @return Key. + */ + public Object key() { + return key; + } + + /** + * @return Value data. + */ + public String someData() { + return someData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java new file mode 100644 index 0000000..07c135d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.lru.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.junits.common.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Multi-node cache test. + */ +public class GridCacheMultiNodeLoadTest extends GridCommonAbstractTest { + /** Cache name. */ + public static final String CACHE_NAME = "partitioned"; + + /** Elements count. */ + public static final int ELEMENTS_COUNT = 200000; + + /** Grid 1. */ + private static Ignite ignite1; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName(CACHE_NAME); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setDistributionMode(PARTITIONED_ONLY); + cacheCfg.setSwapEnabled(false); + cacheCfg.setStartSize(10); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + + cacheCfg.setEvictionPolicy(new GridCacheLruEvictionPolicy(100000)); + cacheCfg.setBackups(1); + + cacheCfg.setPreloadMode(SYNC); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite1 = startGrid(1); + startGrid(2); + + ignite1.cache(CACHE_NAME); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + ignite1 = null; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return Long.MAX_VALUE; + } + + /** + * @throws Exception If test failed. + */ + public void testMany() throws Exception { + ignite1.compute().execute(GridCacheLoadPopulationTask.class, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheAbstractLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheAbstractLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheAbstractLoadTest.java new file mode 100644 index 0000000..cd55332 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheAbstractLoadTest.java @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.apache.log4j.*; +import org.apache.log4j.varia.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.logger.*; +import org.jetbrains.annotations.*; +import org.springframework.beans.*; +import org.springframework.context.*; +import org.springframework.context.support.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Common stuff for cache load tests. + */ +abstract class GridCacheAbstractLoadTest { + /** Random. */ + protected static final Random RAND = new Random(); + + /** Default configuration file path. */ + protected static final String CONFIG_FILE = "modules/tests/config/spring-cache-load.xml"; + + /** Default log file path. */ + protected static final String LOG_FILE = "cache-load.log"; + + /** Whether to use transactions. */ + protected final boolean tx; + + /** Operations per transaction. */ + protected final int operationsPerTx; + + /** Transaction isolation level. */ + protected final IgniteTxIsolation isolation; + + /** Transaction concurrency control. */ + protected final IgniteTxConcurrency concurrency; + + /** Threads count. */ + protected final int threads; + + /** Write ratio. */ + protected final double writeRatio; + + /** Test duration. */ + protected final long testDuration; + + /** Value size. */ + protected final int valSize; + + /** */ + protected static final int WRITE_LOG_MOD = 100; + + /** */ + protected static final int READ_LOG_MOD = 1000; + + /** Reads. */ + protected final AtomicLong reads = new AtomicLong(); + + /** Reads. */ + protected final AtomicLong readTime = new AtomicLong(); + + /** Writes. */ + protected final AtomicLong writes = new AtomicLong(); + + /** Writes. */ + protected final AtomicLong writeTime = new AtomicLong(); + + /** Done flag. */ + protected final AtomicBoolean done = new AtomicBoolean(); + + /** */ + protected GridCacheAbstractLoadTest() { + Properties props = new Properties(); + + try { + props.load(new FileReader(GridTestUtils.resolveGridGainPath( + "modules/tests/config/cache-load.properties"))); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + tx = Boolean.valueOf(props.getProperty("transactions")); + operationsPerTx = Integer.valueOf(props.getProperty("operations.per.tx")); + isolation = IgniteTxIsolation.valueOf(props.getProperty("isolation")); + concurrency = IgniteTxConcurrency.valueOf(props.getProperty("concurrency")); + threads = Integer.valueOf(props.getProperty("threads")); + writeRatio = Double.valueOf(props.getProperty("write.ratio")); + testDuration = Long.valueOf(props.getProperty("duration")); + valSize = Integer.valueOf(props.getProperty("value.size")); + } + + /** + * @param writeClos Write closure. + * @param readClos ReadClosure. + */ + protected void loadTest(final CIX1<GridCacheProjection<Integer, Integer>> writeClos, + final CIX1<GridCacheProjection<Integer, Integer>> readClos) { + info("Read threads: " + readThreads()); + info("Write threads: " + writeThreads()); + info("Test duration (ms): " + testDuration); + + Ignite ignite = G.ignite(); + + final GridCache<Integer, Integer> cache = ignite.cache(null); + + assert cache != null; + + try { + IgniteFuture<?> f1 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + long start = System.currentTimeMillis(); + + while (!done.get()) { + if (tx) { + try (IgniteTx tx = cache.txStart()) { + writeClos.apply(cache); + + tx.commit(); + } + } + else + writeClos.apply(cache); + } + + writeTime.addAndGet(System.currentTimeMillis() - start); + + return null; + } + }, writeThreads(), "cache-load-test-worker"); + + IgniteFuture<?> f2 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + long start = System.currentTimeMillis(); + + while(!done.get()) { + if (tx) { + try (IgniteTx tx = cache.txStart()) { + readClos.apply(cache); + + tx.commit(); + } + } + else + readClos.apply(cache); + } + + readTime.addAndGet(System.currentTimeMillis() - start); + + return null; + } + }, readThreads(), "cache-load-test-worker"); + + Thread.sleep(testDuration); + + done.set(true); + + f1.get(); + f2.get(); + + info("Test stats: "); + info(" total-threads = " + threads); + info(" write-ratio = " + writeRatio); + info(" total-runs = " + (reads.get() + writes.get())); + info(" total-reads = " + reads); + info(" total-writes = " + writes); + info(" read-time (ms) = " + readTime); + info(" write-time (ms) = " + writeTime); + info(" avg-read-time (ms) = " + ((double)readTime.get() / reads.get())); + info(" avg-write-time (ms) = " + ((double)writeTime.get() / writes.get())); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * @return Write threads count. + */ + @SuppressWarnings({"ConstantConditions"}) + protected int writeThreads() { + int ratio = (int)(threads * writeRatio); + + return writeRatio == 0 ? 0 : ratio == 0 ? 1 : ratio; + } + + /** + * @return Read threads count. + */ + @SuppressWarnings({"ConstantConditions"}) + protected int readThreads() { + int ratio = (int)(threads * (1 - writeRatio)); + + return Double.compare(writeRatio, 1) == 0 ? 0 : ratio == 0 ? 1 : ratio; + } + + /** + * @param msg Message to print. + */ + protected static void info(String msg) { + System.out.println(msg); + } + + /** + * @param msg Message to print. + */ + protected static void error(String msg) { + System.err.println(msg); + } + + /** + * Initializes logger. + * + * @param log Log file name. + * @return Logger. + * @throws IgniteCheckedException If file initialization failed. + */ + protected IgniteLogger initLogger(String log) throws IgniteCheckedException { + Logger impl = Logger.getRootLogger(); + + impl.removeAllAppenders(); + + String fileName = U.getGridGainHome() + "/work/log/" + log; + + // Configure output that should go to System.out + RollingFileAppender fileApp; + + String fmt = "[%d{ABSOLUTE}][%-5p][%t][%c{1}] %m%n"; + + try { + fileApp = new RollingFileAppender(new PatternLayout(fmt), fileName); + + fileApp.setMaxBackupIndex(0); + fileApp.setAppend(false); + + // fileApp.rollOver(); + + fileApp.activateOptions(); + } + catch (IOException e) { + throw new IgniteCheckedException("Unable to initialize file appender.", e); + } + + LevelRangeFilter lvlFilter = new LevelRangeFilter(); + + lvlFilter.setLevelMin(Level.DEBUG); + + fileApp.addFilter(lvlFilter); + + impl.addAppender(fileApp); + + // Configure output that should go to System.out + ConsoleAppender conApp = new ConsoleAppender(new PatternLayout(fmt), ConsoleAppender.SYSTEM_OUT); + + lvlFilter = new LevelRangeFilter(); + + lvlFilter.setLevelMin(Level.DEBUG); + lvlFilter.setLevelMax(Level.INFO); + + conApp.addFilter(lvlFilter); + + conApp.activateOptions(); + + impl.addAppender(conApp); + + // Configure output that should go to System.err + conApp = new ConsoleAppender(new PatternLayout(fmt), ConsoleAppender.SYSTEM_ERR); + + conApp.setThreshold(Level.WARN); + + conApp.activateOptions(); + + impl.addAppender(conApp); + + impl.setLevel(Level.INFO); + + //Logger.getLogger("org.gridgain").setLevel(Level.INFO); + //Logger.getLogger(GridCacheVersionManager.class).setLevel(Level.DEBUG); + + return new GridTestLog4jLogger(false); + } + + /** + * Initializes configurations. + * + * @param springCfgPath Configuration file path. + * @param log Log file name. + * @return Configuration. + * @throws IgniteCheckedException If fails. + */ + @SuppressWarnings("unchecked") + protected IgniteConfiguration configuration(String springCfgPath, String log) throws IgniteCheckedException { + File path = GridTestUtils.resolveGridGainPath(springCfgPath); + + if (path == null) + throw new IgniteCheckedException("Spring XML configuration file path is invalid: " + new File(springCfgPath) + + ". Note that this path should be either absolute path or a relative path to GRIDGAIN_HOME."); + + if (!path.isFile()) + throw new IgniteCheckedException("Provided file path is not a file: " + path); + + // Add no-op logger to remove no-appender warning. + Appender app = new NullAppender(); + + Logger.getRootLogger().addAppender(app); + + ApplicationContext springCtx; + + try { + springCtx = new FileSystemXmlApplicationContext(path.toURI().toURL().toString()); + } + catch (BeansException | MalformedURLException e) { + throw new IgniteCheckedException("Failed to instantiate Spring XML application context: " + e.getMessage(), e); + } + + Map cfgMap; + + try { + // Note: Spring is not generics-friendly. + cfgMap = springCtx.getBeansOfType(IgniteConfiguration.class); + } + catch (BeansException e) { + throw new IgniteCheckedException("Failed to instantiate bean [type=" + IgniteConfiguration.class + ", err=" + + e.getMessage() + ']', e); + } + + if (cfgMap == null) + throw new IgniteCheckedException("Failed to find a single grid factory configuration in: " + path); + + // Remove previously added no-op logger. + Logger.getRootLogger().removeAppender(app); + + if (cfgMap.isEmpty()) + throw new IgniteCheckedException("Can't find grid factory configuration in: " + path); + else if (cfgMap.size() > 1) + throw new IgniteCheckedException("More than one configuration provided for cache load test: " + cfgMap.values()); + + IgniteConfiguration cfg = (IgniteConfiguration)cfgMap.values().iterator().next(); + + cfg.setGridLogger(initLogger(log)); + + cfg.getTransactionsConfiguration().setDefaultTxIsolation(isolation); + cfg.getTransactionsConfiguration().setDefaultTxConcurrency(concurrency); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheAffinityTransactionsOffHeapTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheAffinityTransactionsOffHeapTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheAffinityTransactionsOffHeapTest.java new file mode 100644 index 0000000..2c24752 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheAffinityTransactionsOffHeapTest.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMemoryMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + */ +public class GridCacheAffinityTransactionsOffHeapTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODE_CNT = 4; + + /** */ + private static final int THREAD_CNT = 1; + + /** */ + private static final int KEY_CNT = 10; + + /** + * @param args Command line arguments. + * @throws Exception In case of error. + */ + public static void main(String[] args) throws Exception { + startNodes(); + + for (int i = 0; i < KEY_CNT; i++) { + GridCache<Object, Integer> c = cache(i); + + c.putx((long)i, 0); + c.putx(new UserKey(i, 0), 0); + c.putx(new UserKey(i, 1), 0); + c.putx(new UserKey(i, 2), 0); + } + + assert cache(5).get(5L) != null; + + long key = 5; + + GridCache<Object, Integer> c = cache(key); + + try (IgniteTx tx = c.txStartAffinity(key, PESSIMISTIC, REPEATABLE_READ, 0, 0)) { + Integer val = c.get(key); + Integer userVal1 = c.get(new UserKey(key, 0)); + Integer userVal2 = c.get(new UserKey(key, 1)); + Integer userVal3 = c.get(new UserKey(key, 2)); + + assert val != null; + assert userVal1 != null; + assert userVal2 != null; + assert userVal3 != null; + + assert userVal1.equals(val); + assert userVal2.equals(val); + assert userVal3.equals(val); + + int newVal = val + 1; + + c.putx(key, newVal); + c.putx(new UserKey(key, 0), newVal); + c.putx(new UserKey(key, 1), newVal); + c.putx(new UserKey(key, 2), newVal); + + tx.commit(); + } + +// final AtomicLong txCnt = new AtomicLong(); +// +// GridTestUtils.runMultiThreaded( +// new Callable<Object>() { +// @Override public Object call() throws Exception { +// Random rnd = new Random(); +// +// while (!Thread.currentThread().isInterrupted()) { +// long key = rnd.nextInt(KEY_CNT); +// +// GridCache<Object, Integer> c = cache(key); +// +// try (GridCacheTx tx = c.txStartAffinity(key, PESSIMISTIC, REPEATABLE_READ, 0, 0)) { +// Integer val = c.get(key); +// Integer userVal1 = c.get(new UserKey(key, 0)); +// Integer userVal2 = c.get(new UserKey(key, 1)); +// Integer userVal3 = c.get(new UserKey(key, 2)); +// +// assert val != null; +// assert userVal1 != null; +// assert userVal2 != null; +// assert userVal3 != null; +// +// assert userVal1.equals(val); +// assert userVal2.equals(val); +// assert userVal3.equals(val); +// +// int newVal = val + 1; +// +// c.putx(key, newVal); +// c.putx(new UserKey(key, 0), newVal); +// c.putx(new UserKey(key, 1), newVal); +// c.putx(new UserKey(key, 2), newVal); +// +// tx.commit(); +// } +// +// long txDone = txCnt.incrementAndGet(); +// +// if (txDone % 1000 == 0) +// System.out.println("Transactions done: " + txDone); +// } +// +// return null; +// } +// }, +// THREAD_CNT, +// "test-thread" +// ); + } + + /** + * @param key Key. + * @return Cache. + */ + private static GridCache<Object, Integer> cache(long key) { + UUID id = Ignition.ignite("grid-0").cache(null).affinity().mapKeyToNode(key).id(); + + return Ignition.ignite(id).cache(null); + } + + /** + * @throws IgniteCheckedException In case of error. + */ + private static void startNodes() throws IgniteCheckedException { + for (int i = 0; i < NODE_CNT; i++) + Ignition.start(getConfiguration("grid-" + i)); + } + + /** + * @param name Grid name. + * @return Configuration. + */ + private static IgniteConfiguration getConfiguration(String name) { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName(name); + + CacheConfiguration cacheCfg = new CacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setMemoryMode(OFFHEAP_TIERED); + cacheCfg.setOffHeapMaxMemory(0); + cacheCfg.setBackups(1); + + cfg.setCacheConfiguration(cacheCfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** + */ + private static class UserKey implements Externalizable { + /** */ + @GridCacheAffinityKeyMapped + private long affKey; + + /** */ + private int idx; + + /** + */ + public UserKey() { + // No-op. + } + + /** + * @param affKey Affinity key. + * @param idx Index. + */ + private UserKey(long affKey, int idx) { + this.affKey = affKey; + this.idx = idx; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(affKey); + out.writeInt(idx); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + affKey = in.readLong(); + idx = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + UserKey key = (UserKey)o; + + return affKey == key.affKey && idx == key.idx; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = (int)(affKey ^ (affKey >>> 32)); + + result = 31 * result + idx; + + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheBenchmark.java new file mode 100644 index 0000000..ec8ff01 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheBenchmark.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Benchmark for cache {@code putx()} and {@code get()} operations. + */ +public class GridCacheBenchmark { + /** Warm up time. */ + public static final long WARM_UP_TIME = Long.getLong("TEST_WARMUP_TIME", 20000); + + /** Number of puts. */ + private static final long PUT_CNT = Integer.getInteger("TEST_PUT_COUNT", 3000000); + + /** Thread count. */ + private static final int THREADS = Integer.getInteger("TEST_THREAD_COUNT", 16); + + /** Test write or read operations. */ + private static boolean testWrite = Boolean.getBoolean("TEST_WRITE"); + + /** Cache name. */ + private static final String CACHE = "partitioned"; + + /** Counter. */ + private static final AtomicLong cntr = new AtomicLong(); + + /** */ + private static final int LOG_MOD = 500000; + + /** + * @param args Arguments. + * @throws Exception If failed. + */ + @SuppressWarnings("BusyWait") + public static void main(String[] args) throws Exception { + GridFileLock fileLock = GridLoadTestUtils.fileLock(); + + fileLock.lock(); + + try { + final String outputFileName = args.length > 0 ? args[0] : null; + + // try (Grid g = G.start("modules/core/src/test/config/load/cache-client-benchmark.xml")) { + try (Ignite g = G.start("modules/core/src/test/config/load/cache-benchmark.xml")) { + X.println("warmupTime=" + WARM_UP_TIME); + X.println("putCnt=" + PUT_CNT); + X.println("threadCnt=" + THREADS); + X.println("testWrite=" + testWrite); + + final GridCache<Long, Long> cache = g.cache(CACHE); + + assert cache != null; + + cntr.set(0); + + final AtomicLong opCnt = new AtomicLong(); + + X.println("Warming up (putx)..."); + + GridLoadTestUtils.runMultithreadedInLoop(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + long keyVal = cntr.incrementAndGet(); + + cache.putx(keyVal % 100000, keyVal); + + long ops = opCnt.incrementAndGet(); + + if (ops % LOG_MOD == 0) + X.println(">>> Performed " + ops + " operations."); + + return null; + } + }, THREADS, WARM_UP_TIME); + + cntr.set(0); + + opCnt.set(0); + + X.println("Warming up (get)..."); + + GridLoadTestUtils.runMultithreadedInLoop(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + long keyVal = cntr.incrementAndGet(); + + Long old = cache.get(keyVal % 100000); + + long ops = opCnt.incrementAndGet(); + + if (ops % LOG_MOD == 0) + X.println(">>> Performed " + ops + " operations, old=" + old + ", keyval=" + keyVal); + + return null; + } + }, THREADS, WARM_UP_TIME); + + cache.clearAll(); + + System.gc(); + + cntr.set(0); + + opCnt.set(0); + + X.println("Starting GridGain cache putx() benchmark..."); + + long durPutx = GridLoadTestUtils.measureTime(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + while (true) { + long keyVal = cntr.incrementAndGet(); + + if (keyVal >= PUT_CNT) + break; + + cache.putx(keyVal % 100000, keyVal); + + long ops = opCnt.incrementAndGet(); + + if (ops % LOG_MOD == 0) + X.println(">>> Performed " + ops + " operations."); + } + + return null; + } + }, THREADS); + + X.println(">>>"); + X.println(">> GridGain cache putx() benchmark results [duration=" + durPutx + " ms, tx/sec=" + + (opCnt.get() * 1000 / durPutx) + ", total=" + opCnt.get() + ']'); + X.println(">>>"); + + System.gc(); + + cntr.set(0); + + opCnt.set(0); + + X.println("Starting GridGain cache get() benchmark..."); + + long durGet = GridLoadTestUtils.measureTime(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + while (true) { + long keyVal = cntr.incrementAndGet(); + + if (keyVal >= PUT_CNT) + break; + + Long old = cache.get(keyVal % 100000); + + long ops = opCnt.incrementAndGet(); + + if (ops % LOG_MOD == 0) + X.println(">>> Performed " + ops + " operations, old=" + old + ", keyval=" + keyVal); + } + + return null; + } + }, THREADS); + + X.println(">>>"); + X.println(">> GridGain cache get() benchmark results [duration=" + durGet + " ms, tx/sec=" + + (opCnt.get() * 1000 / durGet) + ", total=" + opCnt.get() + ']'); + X.println(">>>"); + + if (outputFileName != null) + GridLoadTestUtils.appendLineToFile( + outputFileName, + "%s,%d,%d", + GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date()), + durPutx, + durGet); + } + } + finally { + fileLock.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java new file mode 100644 index 0000000..b5a45cd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.datastructures.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; + +/** + * Cache data structures load test. + */ +public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoadTest { + /** Atomic long name. */ + private static final String TEST_LONG_NAME = "test-atomic-long"; + + /** Atomic reference name. */ + private static final String TEST_REF_NAME = "test-atomic-ref"; + + /** Atomic sequence name. */ + private static final String TEST_SEQ_NAME = "test-atomic-seq"; + + /** Atomic stamped name. */ + private static final String TEST_STAMP_NAME = "test-atomic-stamp"; + + /** Queue name. */ + private static final String TEST_QUEUE_NAME = "test-queue"; + + /** Count down latch name. */ + private static final String TEST_LATCH_NAME = "test-latch"; + + /** Maximum added value. */ + private static final int MAX_INT = 1000; + + /** Count down latch initial count. */ + private static final int LATCH_INIT_CNT = 1000; + + /** */ + private static final boolean LONG = false; + + /** */ + private static final boolean REF = false; + + /** */ + private static final boolean SEQ = false; + + /** */ + private static final boolean STAMP = false; + + /** */ + private static final boolean QUEUE = false; + + /** */ + private static final boolean LATCH = true; + + /** */ + private GridCacheDataStructuresLoadTest() { + // No-op + } + + /** Atomic long write closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> longWriteClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + GridCacheAtomicLong al = cache.cache().dataStructures().atomicLong(TEST_LONG_NAME, 0, true); + + for (int i = 0; i < operationsPerTx; i++) { + al.addAndGet(RAND.nextInt(MAX_INT)); + + long cnt = writes.incrementAndGet(); + + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } + } + }; + + /** Atomic long read closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> longReadClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + GridCacheAtomicLong al = cache.cache().dataStructures().atomicLong(TEST_LONG_NAME, 0, true); + + for (int i = 0; i < operationsPerTx; i++) { + al.get(); + + long cnt = reads.incrementAndGet(); + + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } + } + }; + + /** Atomic reference write closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> refWriteClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + GridCacheAtomicReference<Integer> ar = cache.cache().dataStructures().atomicReference(TEST_REF_NAME, + null, true); + + for (int i = 0; i < operationsPerTx; i++) { + ar.set(RAND.nextInt(MAX_INT)); + + long cnt = writes.incrementAndGet(); + + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } + } + }; + + /** Atomic reference read closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> refReadClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + GridCacheAtomicReference<Integer> ar = cache.cache().dataStructures().atomicReference(TEST_REF_NAME, null, + true); + + for (int i = 0; i < operationsPerTx; i++) { + ar.get(); + + long cnt = reads.incrementAndGet(); + + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } + } + }; + + /** Atomic sequence write closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> seqWriteClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + GridCacheAtomicSequence as = cache.cache().dataStructures().atomicSequence(TEST_SEQ_NAME, 0, true); + + for (int i = 0; i < operationsPerTx; i++) { + as.addAndGet(RAND.nextInt(MAX_INT) + 1); + + long cnt = writes.incrementAndGet(); + + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } + } + }; + + /** Atomic sequence read closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> seqReadClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + GridCacheAtomicSequence as = cache.cache().dataStructures().atomicSequence(TEST_SEQ_NAME, 0, true); + + for (int i = 0; i < operationsPerTx; i++) { + as.get(); + + long cnt = reads.incrementAndGet(); + + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } + } + }; + + /** Atomic stamped write closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> stampWriteClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + GridCacheAtomicStamped<Integer, Integer> as = cache.cache().dataStructures().atomicStamped(TEST_STAMP_NAME, + 0, 0, true); + + for (int i = 0; i < operationsPerTx; i++) { + as.set(RAND.nextInt(MAX_INT), RAND.nextInt(MAX_INT)); + + long cnt = writes.incrementAndGet(); + + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } + } + }; + + /** Atomic stamped read closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> stampReadClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + GridCacheAtomicStamped<Integer, Integer> as = cache.cache().dataStructures().atomicStamped(TEST_STAMP_NAME, + 0, 0, true); + + for (int i = 0; i < operationsPerTx; i++) { + as.get(); + + long cnt = reads.incrementAndGet(); + + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } + } + }; + + /** Queue write closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> queueWriteClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + GridCacheQueue<Integer> q = cache.cache().dataStructures().queue(TEST_QUEUE_NAME, 0, true, true); + + for (int i = 0; i < operationsPerTx; i++) { + q.put(RAND.nextInt(MAX_INT)); + + long cnt = writes.incrementAndGet(); + + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } + } + }; + + /** Queue read closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> queueReadClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + GridCacheQueue<Integer> q = cache.cache().dataStructures().queue(TEST_QUEUE_NAME, 0, true, true); + + for (int i = 0; i < operationsPerTx; i++) { + q.peek(); + + long cnt = reads.incrementAndGet(); + + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } + } + }; + + /** Count down latch write closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> latchWriteClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + GridCacheCountDownLatch l = cache.cache().dataStructures().countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, + true, true); + + for (int i = 0; i < operationsPerTx; i++) { + l.countDown(); + + long cnt = writes.incrementAndGet(); + + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } + } + }; + + /** Count down latch read closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> latchReadClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + GridCacheCountDownLatch l = cache.cache().dataStructures().countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, + true, true); + + for (int i = 0; i < operationsPerTx; i++) { + l.count(); + + long cnt = reads.incrementAndGet(); + + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } + } + }; + + /** + * @param args Arguments. + * @throws IgniteCheckedException In case of error. + */ + public static void main(String[] args) throws IgniteCheckedException { + System.setProperty(IgniteSystemProperties.GG_UPDATE_NOTIFIER, "false"); + + System.out.println("Starting master node [params=" + Arrays.toString(args) + ']'); + + String cfg = args.length >= 1 ? args[0] : CONFIG_FILE; + String log = args.length >= 2 ? args[1] : LOG_FILE; + + final GridCacheDataStructuresLoadTest test = new GridCacheDataStructuresLoadTest(); + + try (Ignite g = Ignition.start(test.configuration(cfg, log))) { + System.gc(); + + if (LONG) { + info("Testing atomic long..."); + + test.loadTest(test.longWriteClos, test.longReadClos); + } + + System.gc(); + + if (REF) { + info("Testing atomic reference..."); + + test.loadTest(test.refWriteClos, test.refReadClos); + } + + System.gc(); + + if (SEQ) { + info("Testing atomic sequence..."); + + test.loadTest(test.seqWriteClos, test.seqReadClos); + } + + System.gc(); + + if (STAMP) { + info("Testing atomic stamped..."); + + test.loadTest(test.stampWriteClos, test.stampReadClos); + } + + System.gc(); + + if (QUEUE) { + info("Testing queue..."); + + test.loadTest(test.queueWriteClos, test.queueReadClos); + } + + System.gc(); + + if (LATCH) { + info("Testing count down latch..."); + + test.loadTest(test.latchWriteClos, test.latchReadClos); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheGroupLockComparisonTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheGroupLockComparisonTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheGroupLockComparisonTest.java new file mode 100644 index 0000000..ee7000b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheGroupLockComparisonTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.IgniteTxIsolation.REPEATABLE_READ; + +/** + * Performance comparison between putAll and group lock. + * + */ +public class GridCacheGroupLockComparisonTest { + /** Batch size. */ + private static final int BATCH_SIZE = Integer.getInteger("TEST_BATCH_SIZE", 25000); + + /** Thread count. */ + private static final int THREADS = Integer.getInteger("TEST_THREAD_COUNT", 16); + + /** Cache name. */ + private static final String CACHE = "partitioned"; + + /** Total number of objects in cache. */ + private static final long OBJECT_CNT = Integer.getInteger("TEST_OBJECT_COUNT", 2000000); + + /** Counter. */ + private static final AtomicLong cntr = new AtomicLong(); + + /** */ + private static final int LOG_MOD = 50000; + + /** + * @param args Arguments. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + try (Ignite g = G.start("modules/tests/config/load/cache-benchmark.xml")) { + System.out.println("threadCnt=" + THREADS); + System.out.println("objectCnt=" + OBJECT_CNT); + System.out.println("batchSize=" + BATCH_SIZE); + + // Populate and warm-up. + gridGainGroupLock(g, OBJECT_CNT, THREADS); + + gridGainGroupLock(g, OBJECT_CNT, THREADS); + } + } + + /** + * @param ignite Grid. + * @param max Maximum cache size. + * @param threads Threads. + * @throws Exception If failed. + */ + private static void gridGainPutAll(Ignite ignite, final long max, int threads) throws Exception { + X.println(">>>"); + X.println(">>> Testing putAll"); + X.println(">>>"); + + final GridCache<GridCacheAffinityKey<Long>, Long> cache = ignite.cache(CACHE); + + assert cache != null; + + final AtomicLong opCnt = new AtomicLong(); + + cntr.set(0); + + final long start = System.currentTimeMillis(); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + while (true) { + Map<GridCacheAffinityKey<Long>, Long> vals = + new HashMap<>(BATCH_SIZE); + + long start = cntr.getAndAdd(BATCH_SIZE); + + if (start >= max) + break; + + for (long i = start; i < start + BATCH_SIZE; i++) + vals.put(new GridCacheAffinityKey<>(i % 100000, start), i); + + cache.putAll(vals); + + long ops = opCnt.addAndGet(BATCH_SIZE); + + if (ops % LOG_MOD == 0) + X.println(">>> Performed " + ops + " operations."); + } + + return null; + } + }, threads, "load-worker"); + + long dur = System.currentTimeMillis() - start; + + X.println(">>>"); + X.println(">> putAll timed results [dur=" + dur + " ms, tx/sec=" + (opCnt.get() * 1000 / dur) + + ", total=" + opCnt.get() + ", duration=" + (dur + 500) / 1000 + "s]"); + X.println(">>>"); + } + + /** + * @param ignite Grid. + * @param max Maximum cache size. + * @param threads Threads. + * @throws Exception If failed. + */ + private static void gridGainGroupLock(Ignite ignite, final long max, int threads) throws Exception { + X.println(">>>"); + X.println(">>> Testing group lock"); + X.println(">>>"); + + final GridCache<GridCacheAffinityKey<Long>, Long> cache = ignite.cache(CACHE); + + assert cache != null; + + final AtomicLong opCnt = new AtomicLong(); + + cntr.set(0); + + final AtomicInteger range = new AtomicInteger(); + + final long start = System.currentTimeMillis(); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + int affIdx = range.getAndIncrement(); + + String affKey = Thread.currentThread().getName(); + + long rangeCnt = OBJECT_CNT / THREADS; + + long base = affIdx * rangeCnt; + + X.println("Going to put vals in range [" + base + ", " + (base + rangeCnt - 1) + ']'); + + long key = 0; + + while (true) { + long total = cntr.getAndAdd(BATCH_SIZE); + + if (total >= max) + break; + + // Threads should not lock the same key. + + try (IgniteTx tx = cache.txStartAffinity(affKey, PESSIMISTIC, REPEATABLE_READ, 0, BATCH_SIZE)) { + for (long i = 0; i < BATCH_SIZE; i++) { + cache.put(new GridCacheAffinityKey<>((key % rangeCnt) + base, affKey), i); + + key++; + } + + tx.commit(); + } + + long ops = opCnt.addAndGet(BATCH_SIZE); + + if (ops % LOG_MOD == 0) + X.println(">>> Performed " + ops + " operations."); + } + + return null; + } + }, threads, "load-worker"); + + long dur = System.currentTimeMillis() - start; + + X.println(">>>"); + X.println(">>> Cache size: " + cache.size()); + X.println(">>> Group lock timed results [dur=" + dur + " ms, tx/sec=" + (opCnt.get() * 1000 / dur) + + ", total=" + opCnt.get() + ", duration=" + (dur + 500) / 1000 + "s]"); + X.println(">>>"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheLoadTest.java new file mode 100644 index 0000000..2689672 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheLoadTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Cache load test. + */ +public final class GridCacheLoadTest extends GridCacheAbstractLoadTest { + /** Memory test. */ + private static final boolean MEMORY = false; + + /** Load test. */ + private static final boolean LOAD = true; + + /** */ + private static final int KEY_RANGE = 1000; + + /** */ + private GridCacheLoadTest() { + // No-op + } + + /** Write closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> writeClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + for (int i = 0; i < operationsPerTx; i++) { + int kv = RAND.nextInt(KEY_RANGE); + + assert cache.putx(kv, kv); + + long cnt = writes.incrementAndGet(); + + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes"); + } + } + }; + + /** Read closure. */ + private final CIX1<GridCacheProjection<Integer, Integer>> readClos = + new CIX1<GridCacheProjection<Integer, Integer>>() { + @Override public void applyx(GridCacheProjection<Integer, Integer> cache) + throws IgniteCheckedException { + for (int i = 0; i < operationsPerTx; i++) { + int k = RAND.nextInt(KEY_RANGE); + + Integer v = cache.get(k); + + if (v != null && !v.equals(k)) + error("Invalid value [k=" + k + ", v=" + v + ']'); + + long cnt = reads.incrementAndGet(); + + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads"); + } + } + }; + + /** + * @return New byte array. + */ + private byte[] newArray() { + byte[] bytes = new byte[valSize]; + + // Populate one byte. + bytes[RAND.nextInt(valSize)] = 1; + + return bytes; + } + + /** + * + */ + @SuppressWarnings({"ErrorNotRethrown", "InfiniteLoopStatement"}) + private void memoryTest() { + Ignite ignite = G.ignite(); + + final GridCache<Integer, byte[]> cache = ignite.cache(null); + + assert cache != null; + + final AtomicInteger cnt = new AtomicInteger(); + + try { + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Object call() throws Exception { + while (true) { + int idx; + + cache.putx(idx = cnt.getAndIncrement(), newArray()); + + if (idx % 1000 == 0) + info("Stored '" + idx + "' objects in cache [cache-size=" + cache.keySet().size() + ']'); + } + } + }, threads, "memory-test-worker"); + } + catch (OutOfMemoryError ignore) { + info("Populated '" + cnt.get() + "' 1K objects into cache [cache-size=" + cache.keySet().size() + ']'); + } + catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * @param args Command line. + * @throws Exception If fails. + */ + public static void main(String[] args) throws Exception { + System.setProperty(IgniteSystemProperties.GG_UPDATE_NOTIFIER, "false"); + + System.out.println("Starting master node [params=" + Arrays.toString(args) + ']'); + + String cfg = args.length >= 1 ? args[0] : CONFIG_FILE; + String log = args.length >= 2 ? args[1] : LOG_FILE; + + final GridCacheLoadTest test = new GridCacheLoadTest(); + + try (Ignite g = Ignition.start(test.configuration(cfg, log))) { + System.gc(); + + if (LOAD) + test.loadTest(test.writeClos, test.readClos); + + G.ignite().cache(null).clearAll(); + + System.gc(); + + if (MEMORY) + test.memoryTest(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCachePutRemoveLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCachePutRemoveLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCachePutRemoveLoadTest.java new file mode 100644 index 0000000..f75cdee --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCachePutRemoveLoadTest.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.cache; + +import com.beust.jcommander.*; +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.lru.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.tostring.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMemoryMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * The benchmark that performs put and remove operations on the cache to identify memory leaks. + * <P> + * Run this class with needed parameters. Type '-help' to get the list of the available parameters. + */ +public class GridCachePutRemoveLoadTest { + /** */ + private final Arguments args; + + /** */ + private GridCache<Object, Object> cache; + + /** + * @param args Arguments. + */ + public GridCachePutRemoveLoadTest(Arguments args) { + this.args = args; + } + + /** + * @param a Arguments. + */ + public static void main(String[] a) { + Arguments args = new Arguments(); + + JCommander jCommander = new JCommander(); + + jCommander.setAcceptUnknownOptions(true); + jCommander.addObject(args); + + jCommander.parse(a); + + if (args.help()) { + jCommander.usage(); + + return; + } + + X.println(args.toString()); + + GridCachePutRemoveLoadTest test = new GridCachePutRemoveLoadTest(args); + + try { + test.startNodes(); + + test.runTest(); + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + G.stopAll(true); + } + } + + /** + * @throws Exception If failed. + */ + protected void startNodes() throws Exception { + for (int i = 0; i < args.nodes(); i++) { + IgniteConfiguration cfg = + GridGainEx.loadConfiguration("modules/core/src/test/config/spring-cache-put-remove-load.xml").get1(); + + assert cfg != null; + + cfg.setGridName("g" + i); + + CacheConfiguration cacheCfg = cfg.getCacheConfiguration()[0]; + + GridCacheDistributionMode distro = i == 0 && + args.distribution() == CLIENT_ONLY ? CLIENT_ONLY : PARTITIONED_ONLY; + + cacheCfg.setCacheMode(args.cache()); + cacheCfg.setDistributionMode(distro); + cacheCfg.setWriteSynchronizationMode(args.synchronization()); + cacheCfg.setAtomicWriteOrderMode(args.orderMode()); + + if (cacheCfg.getCacheMode() == GridCacheMode.PARTITIONED) + cacheCfg.setBackups(args.backups()); + + if (args.isOffHeap()) { + cacheCfg.setOffHeapMaxMemory(0); + + if (args.isOffheapValues()) + cacheCfg.setMemoryMode(OFFHEAP_VALUES); + } + + cacheCfg.setAtomicityMode(args.transactional() ? TRANSACTIONAL : ATOMIC); + + if (args.evictionEnabled()) + cacheCfg.setEvictionPolicy(new GridCacheLruEvictionPolicy(1000)); + + G.start(cfg); + } + + Ignite g = G.ignite("g0"); + + assert g != null; + + cache = g.cache("cache"); + + assert cache != null; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("BusyWait") + private void runTest() throws Exception { + X.println(">>>"); + X.println(">>> Running test."); + X.println(">>>"); + + final AtomicLong putNum = new AtomicLong(); + + final AtomicLong rmvNum = new AtomicLong(); + + Thread timer = new Thread(new Runnable() { + @Override public void run() { + try { + while (!Thread.currentThread().isInterrupted()) { + long rmv = rmvNum.get(); + + long put = putNum.get(); + + if (args.evictionEnabled()) + X.println("Put: " + put); + else + X.println("Put: " + put + ", removed: " + rmv); + + Thread.sleep(5000); + } + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + + timer.setDaemon(true); + timer.start(); + + int queueSize = 100000; + + final BlockingQueue<Long> queue = new ArrayBlockingQueue<>(queueSize); + + if (!args.evictionEnabled()) { + Thread rmvThread = new Thread(new Runnable() { + @Override public void run() { + try { + for (long i = 0; i < Long.MAX_VALUE; i++) { + Long key = queue.take(); + + cache.removex(key); + + rmvNum.set(key); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } + }, "rmvThread"); + + rmvThread.start(); + } + + for (long i = 0; i < Long.MAX_VALUE; i++) { + cache.putx(i, i); + + putNum.set(i); + + if (!args.evictionEnabled()) { + // Wait for queue to be empty if remove operation is slower than put operation. + if (!queue.offer(i)) { + while (!queue.isEmpty()) + Thread.sleep(1000); + + X.println("Waited for the remover thread to empty the queue."); + + queue.offer(i); + } + } + } + } + + /** + * + */ + private static class Arguments { + /** Main arguments (arguments without prefix '-') fall here. */ + @Parameter(description = "Main arguments") + @GridToStringExclude + private Iterable<String> mainArgs = new ArrayList<>(); + + /** */ + @Parameter(names = "-n", description = "Nodes") + private int nodes = 1; + + /** */ + @Parameter(names = "-cm", description = "Cache Mode") + private GridCacheMode cacheMode = GridCacheMode.PARTITIONED; + + /** */ + @Parameter(names = "-sm", description = "Synchronization Mode") + private GridCacheWriteSynchronizationMode syncMode = GridCacheWriteSynchronizationMode.PRIMARY_SYNC; + + /** */ + @Parameter(names = "-wo", description = "Write Ordering Mode") + private GridCacheAtomicWriteOrderMode orderMode = GridCacheAtomicWriteOrderMode.CLOCK; + + /** */ + @Parameter(names = "-dm", description = "Distribution mode") + private GridCacheDistributionMode distroMode = PARTITIONED_ONLY; + + /** */ + @Parameter(names = "-ot", description = "Tiered Offheap") + private boolean offheapTiered; + + /** */ + @Parameter(names = "-ov", description = "Offheap Values Only") + private boolean offheapVals; + + /** */ + @Parameter(names = "-b", description = "Backups") + private int backups; + + /** */ + @Parameter(names = "-tx", description = "Whether transactional cache is used or not") + private boolean tx; + + /** */ + @Parameter(names = "-ee", description = "Eviction Enabled") + private boolean evictionEnabled; + + /** */ + @Parameter(names = "-help", description = "Print this help message") + private boolean help; + + /** + * @return If help requested. + */ + public boolean help() { + return help; + } + + /** + * @return Distribution. + */ + public GridCacheDistributionMode distribution() { + return distroMode; + } + + /** + * @return Cache Mode. + */ + public GridCacheMode cache() { + return cacheMode; + } + + /** + * @return Synchronization. + */ + public GridCacheWriteSynchronizationMode synchronization() { + return syncMode; + } + + /** + * @return Cache write ordering mode. + */ + public GridCacheAtomicWriteOrderMode orderMode() { + return orderMode; + } + + /** + * @return Backups. + */ + public int backups() { + return backups; + } + + /** + * @return Offheap tiered. + */ + public boolean isOffheapTiered() { + return offheapTiered; + } + + /** + * @return Offheap values. + */ + public boolean isOffheapValues() { + return offheapVals; + } + + /** + * @return {@code True} if any offheap is enabled. + */ + public boolean isOffHeap() { + return offheapTiered || offheapVals; + } + + /** + * @return Nodes. + */ + public int nodes() { + return nodes; + } + + /** + * @return Whether transactional cache is used or not. + */ + public boolean transactional() { + return tx; + } + + /** + * @return Eviction enabled. + */ + public boolean evictionEnabled() { + return evictionEnabled; + } + + /** + * @return Main arguments. + */ + public Iterable<String> mainArgs() { + return mainArgs; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Arguments.class, this); + } + } +}