http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/colocation/GridTestKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/colocation/GridTestKey.java b/modules/core/src/test/java/org/gridgain/loadtests/colocation/GridTestKey.java deleted file mode 100644 index 38cbad5..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/colocation/GridTestKey.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.colocation; - -import org.apache.ignite.cache.affinity.*; - -import java.io.*; - -/** - * Accenture key. - */ -public class GridTestKey implements Externalizable { - private long id; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridTestKey() { - // No-op. - } - - public GridTestKey(long id) { - this.id = id; - } - - public long getId() { - return id; - } - - @GridCacheAffinityKeyMapped - public int affinityKey() { - return affinityKey(id); - } - - public static int affinityKey(long id) { - return (int)(id % GridTestConstants.MOD_COUNT); - } - - /** - * Implement {@link Externalizable} for faster serialization. This is - * optional and you can simply implement {@link Serializable}. - * - * @param in Input. - * @throws IOException If failed. - */ - @Override public void readExternal(ObjectInput in) throws IOException { - id = in.readLong(); - } - - /** - * Implement {@link Externalizable} for faster serialization. This is - * optional and you can simply implement {@link Serializable}. - * - * @param out Output. - * @throws IOException If failed. - */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(id); - } - - @Override public boolean equals(Object o) { - if (this == o) - return true; - - GridTestKey key = (GridTestKey)o; - - return id == key.id; - } - - @Override public int hashCode() { - return (int)(id ^ (id >>> 32)); - } - - @Override public String toString() { - return "AccentureKey [id=" + id + ']'; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/colocation/GridTestLifecycleBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/colocation/GridTestLifecycleBean.java b/modules/core/src/test/java/org/gridgain/loadtests/colocation/GridTestLifecycleBean.java deleted file mode 100644 index e2bae1a..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/colocation/GridTestLifecycleBean.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.colocation; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.lifecycle.*; -import org.apache.ignite.resources.*; - -/** - * Lifecycle bean. - */ -public class GridTestLifecycleBean implements LifecycleBean { - @IgniteInstanceResource - private Ignite g; - - @Override public void onLifecycleEvent(LifecycleEventType type) throws IgniteCheckedException { - if (type == LifecycleEventType.AFTER_GRID_START) { - GridCache<GridTestKey, Long> cache = g.cache("partitioned"); - - assert cache != null; - - cache.loadCache(null, 0, GridTestConstants.LOAD_THREADS, GridTestConstants.ENTRY_COUNT); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/colocation/GridTestMain.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/colocation/GridTestMain.java b/modules/core/src/test/java/org/gridgain/loadtests/colocation/GridTestMain.java deleted file mode 100644 index 91fd8d9..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/colocation/GridTestMain.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.colocation; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.thread.*; -import org.apache.ignite.internal.util.typedef.*; -import org.springframework.beans.factory.*; -import org.springframework.context.support.*; - -import java.util.concurrent.*; - -/** - * Accenture collocated example. - */ -public class GridTestMain { - /** - * Main method. - * - * @param args Parameters. - * @throws IgniteCheckedException If failed. - */ - public static void main(String[] args) throws Exception { - BeanFactory ctx = new ClassPathXmlApplicationContext("org/gridgain/loadtests/colocation/spring-colocation.xml"); - - // Initialize Spring factory. - try (Ignite g = G.start((IgniteConfiguration)ctx.getBean("grid.cfg"))) { - final GridCache<GridTestKey, Long> cache = g.cache("partitioned"); - - assert cache != null; - - // Uncomment if you plan to load cache using AccentureCacheStore. - // generateAndLoad(); - - // Uncomment if you plan to load cache from cache store. - // Note that you could also do this automatically from lifecycle bean. - // To configure lifecycle bean, uncomment 'lifecycleBeans' property in - // spring-accenture.xml file. - loadFromStore(cache); - - X.println("Number of entries in cache: " + cache.size()); - - colocateJobs(); - //localPoolRun(); - } - } - - /** - * @throws Exception If failed. - */ - private static void colocateJobs() throws Exception { - X.println("Collocating jobs..."); - - Ignite g = G.ignite(); - - final GridCache<GridTestKey, Long> cache = g.cache("partitioned"); - - final BlockingQueue<IgniteFuture> q = new ArrayBlockingQueue<>(400); - - long start = System.currentTimeMillis(); - - IgniteCompute comp = g.compute().enableAsync(); - - // Collocate computations and data. - for (long i = 0; i < GridTestConstants.ENTRY_COUNT; i++) { - final long key = i; - - comp.affinityRun("partitioned", GridTestKey.affinityKey(key), new Runnable() { - // This code will execute on remote nodes by collocating keys with cached data. - @Override public void run() { - Long val = cache.peek(new GridTestKey(key)); - - if (val == null || val != key) - throw new RuntimeException("Invalid value found [key=" + key + ", val=" + val + ']'); - } - }); - - final IgniteFuture<?> f = comp.future(); - - q.put(f); - - f.listenAsync(new CI1<IgniteFuture<?>>() { - @Override public void apply(IgniteFuture<?> o) { - q.poll(); - } - }); - - if (i % 10000 == 0) - X.println("Executed jobs: " + i); - } - - long end = System.currentTimeMillis(); - - X.println("Executed " + GridTestConstants.ENTRY_COUNT + " computations in " + (end - start) + "ms."); - } - - /** - * - */ - private static void localPoolRun() { - X.println("Local thread pool run..."); - - ExecutorService exe = new IgniteThreadPoolExecutor(400, 400, 0, new ArrayBlockingQueue<Runnable>(400) { - @Override public boolean offer(Runnable runnable) { - try { - put(runnable); - } - catch (InterruptedException e) { - e.printStackTrace(); - } - - return true; - } - }); - - long start = System.currentTimeMillis(); - - final GridCache<GridTestKey, Long> cache = G.ignite().cache("partitioned"); - - // Collocate computations and data. - for (long i = 0; i < GridTestConstants.ENTRY_COUNT; i++) { - final long key = i; - - exe.submit(new Runnable() { - @Override public void run() { - Long val = cache.peek(new GridTestKey(key)); - - if (val == null || val != key) - throw new RuntimeException("Invalid value found [key=" + key + ", val=" + val + ']'); - } - }); - - if (i % 10000 == 0) - X.println("Executed jobs: " + i); - } - - long end = System.currentTimeMillis(); - - X.println("Executed " + GridTestConstants.ENTRY_COUNT + " computations in " + (end - start) + "ms."); - } - - /** - * Load cache from data store. Also take a look at - * {@link GridTestCacheStore#loadAll} method. - * - * @param cache Cache to load. - * @throws IgniteCheckedException If failed. - */ - private static void loadFromStore(GridCache<GridTestKey, Long> cache) throws IgniteCheckedException { - cache.loadCache(null, 0, GridTestConstants.LOAD_THREADS, GridTestConstants.ENTRY_COUNT); - } - - /** - * Generates and loads data directly through cache API using data loader. - * This method is provided as example and is not called directly because - * data is loaded through {@link GridTestCacheStore} store. - * - * @throws Exception If failed. - */ - private static void generateAndLoad() throws Exception { - int numThreads = Runtime.getRuntime().availableProcessors() * 2; - - ExecutorCompletionService<Object> execSvc = - new ExecutorCompletionService<>(Executors.newFixedThreadPool(numThreads)); - - try (IgniteDataLoader<GridTestKey, Long> ldr = G.ignite().dataLoader("partitioned")) { - for (int i = 0; i < numThreads; i++) { - final int threadId = i; - - final int perThreadKeys = GridTestConstants.ENTRY_COUNT / numThreads; - - execSvc.submit(new Callable<Object>() { - @Override public Object call() throws Exception { - int start = threadId * perThreadKeys; - int end = start + perThreadKeys; - - for (long i = start; i < end; i++) - ldr.addData(new GridTestKey(i), i); - - return null; - } - }); - } - - for (int i = 0; i < numThreads; i++) - execSvc.take().get(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/colocation/spring-colocation.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/colocation/spring-colocation.xml b/modules/core/src/test/java/org/gridgain/loadtests/colocation/spring-colocation.xml deleted file mode 100644 index 0278dfa..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/colocation/spring-colocation.xml +++ /dev/null @@ -1,182 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> - -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - -<!-- - GridGain Spring configuration file to startup grid cache. - - When starting a standalone GridGain node, you need to execute the following command: - {GRIDGAIN_HOME}/bin/ggstart.{bat|sh} path-to-this-file/example-cache.xml - - When starting GridGain from Java IDE, pass path to this file into GridGain: - GridGain.start("path-to-this-file/example-benchmark.xml"); ---> -<beans xmlns="http://www.springframework.org/schema/beans" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xmlns:util="http://www.springframework.org/schema/util" - xsi:schemaLocation="http://www.springframework.org/schema/beans - http://www.springframework.org/schema/beans/spring-beans.xsd - http://www.springframework.org/schema/util - http://www.springframework.org/schema/util/spring-util.xsd"> -<!-- - Optional description. - --> - <description> - Spring file for grid configuration with benchmark. - </description> - - <!-- - Configuration below demonstrates how to setup caches within grid nodes. - --> - <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> - <property name="deploymentMode" value="SHARED"/> - - <!-- Set to local host address just for examples. --> - <property name="localHost" value="127.0.0.1"/> - - <property name="marshalLocalJobs" value="false"/> - - <property name="collisionSpi"> - <bean class="org.apache.ignite.spi.collision.fifoqueue.FifoQueueCollisionSpi"> - <property name="parallelJobsNumber"><util:constant static-field="java.lang.Integer.MAX_VALUE"/></property> - </bean> - </property> - - <!-- - Uncomment to provide custom configuration for executor service. - By default thread pool size is 100. - All threads are pre-started and are available for use. - --> - <property name="executorService"> - <bean class="org.apache.ignite.thread.IgniteThreadPoolExecutor"> - <constructor-arg type="int" value="400"/> - <constructor-arg type="int" value="400"/> - <constructor-arg type="long"> - <util:constant static-field="java.lang.Long.MAX_VALUE"/> - </constructor-arg> - <constructor-arg type="java.util.concurrent.BlockingQueue"> - <bean class="java.util.concurrent.LinkedBlockingQueue"/> - </constructor-arg> - </bean> - </property> - - <!-- - Uncomment to provide custom configuration for System executor service. - By default the thread pool size is 5 which should be good enough. - Threads are not started unless used. - --> - <property name="systemExecutorService"> - <bean class="org.apache.ignite.thread.IgniteThreadPoolExecutor"> - <constructor-arg type="int" value="400"/> - <constructor-arg type="int" value="400"/> - <constructor-arg type="long"> - <util:constant static-field="java.lang.Long.MAX_VALUE"/> - </constructor-arg> - <constructor-arg type="java.util.concurrent.BlockingQueue"> - <bean class="java.util.concurrent.LinkedBlockingQueue"/> - </constructor-arg> - </bean> - </property> - - <!-- - Uncomment if you plan to populate cache form lifecycle bean. - --> - <!-- - <property name="lifecycleBeans"> - <bean class="com.accenture.collocation.AccentureLifecycleBean"/> - </property> - --> - - <property name="cacheConfiguration"> - <!-- - Specify list of cache configurations here. Any property from - CacheConfiguration interface can be configured here. - Note that absolutely all configuration properties are optional. - --> - <list> - <!-- - Partitioned cache example configuration. - --> - <bean class="org.apache.ignite.cache.CacheConfiguration"> - <property name="name" value="partitioned"/> - - <property name="cacheMode" value="PARTITIONED"/> - - <!-- Initial cache size. --> - <property name="startSize"> - <util:constant static-field="org.gridgain.loadtests.colocation.GridTestConstants.CACHE_INIT_SIZE"/> - </property> - - <!-- - This shows how to configure number of backups. The below configuration - sets the number of backups to 1 (which is default). - --> - <property name="affinity"> - <bean class="org.apache.ignite.cache.affinity.consistenthash.GridCacheConsistentHashAffinityFunction"> - <property name="keyBackups" value="0"/> <!-- Disable backups. --> - </bean> - </property> - - <!-- Set synchronous preloading (default is asynchronous). --> - <property name="preloadMode" value="SYNC"/> - - <property name="distributionMode" value="PARTITIONED_ONLY"/> - - <property name="queryIndexEnabled" value="false"/> - - <property name="store"> - <bean class="org.gridgain.loadtests.colocation.GridTestCacheStore"/> - </property> - </bean> - </list> - </property> - - <!-- - Uncomment this to provide TCP discovery SPI (predefined addresses). - Use the addresses list to provide IP addresses of initial nodes in the grid - (at least one address must be provided). - --> - <property name="discoverySpi"> - <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> - <property name="ipFinder"> - <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> - <property name="addresses"> - <list> - <!-- - List all IP/port configurations that potentially - can be started first in examples. We are assuming - grid of size 10 or less. - --> - <value>127.0.0.1:47500</value> - <value>127.0.0.1:47501</value> - <value>127.0.0.1:47502</value> - <value>127.0.0.1:47503</value> - <value>127.0.0.1:47504</value> - <value>127.0.0.1:47505</value> - <value>127.0.0.1:47506</value> - <value>127.0.0.1:47507</value> - <value>127.0.0.1:47508</value> - <value>127.0.0.1:47509</value> - </list> - </property> - </bean> - </property> - </bean> - </property> - </bean> -</beans> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/communication/GridIoManagerBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/communication/GridIoManagerBenchmark.java b/modules/core/src/test/java/org/gridgain/loadtests/communication/GridIoManagerBenchmark.java deleted file mode 100644 index f4ec732..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/communication/GridIoManagerBenchmark.java +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.communication; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.managers.discovery.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.gridgain.loadtests.util.*; -import org.gridgain.testframework.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; -import static org.gridgain.testframework.GridLoadTestUtils.*; - -/** - * By default this benchmarks uses original GridGain configuration - * with message dispatching from NIO threads. - * - * By changing {@link #DFLT_CONFIG} constant you can use ForkJoin thread pool instead of JDK default. - * - * Note that you should run 2 processes of this test to get it running. - */ -public class GridIoManagerBenchmark { - /** */ - public static final String DFLT_CONFIG = "modules/tests/config/io-manager-benchmark.xml"; - - /** */ - private static final int DFLT_THREADS = 2; - - /** */ - private static final long WARM_UP_DUR = 30 * 1000; - - /** */ - private static final Semaphore sem = new Semaphore(10 * 1024); - - /** */ - public static final int TEST_TOPIC = 1; - - /** */ - private static final LongAdder msgCntr = new LongAdder(); - - /** */ - private static final Map<IgniteUuid, CountDownLatch> latches = new ConcurrentHashMap8<>(); - - /** */ - private static final byte[][] arrs; - - /** */ - private static boolean testHeavyMsgs; - - /** */ - private static boolean testLatency; - - /** - * - */ - static { - ThreadLocalRandom8 rnd = ThreadLocalRandom8.current(); - - arrs = new byte[64][]; - - for (int i = 0; i < arrs.length; i++) { - byte[] arr = new byte[rnd.nextInt(4096, 8192)]; - - for (int j = 0; j < arr.length; j++) - arr[j] = (byte)rnd.nextInt(0, 127); - - arrs[i] = arr; - } - } - - /** - * @param args Command line arguments. - * @throws IgniteCheckedException If failed. - */ - public static void main(String[] args) throws IgniteCheckedException { - int threads = args.length > 0 ? Integer.parseInt(args[0]) : DFLT_THREADS; - int duration = args.length > 1 ? Integer.parseInt(args[1]) : 0; - String outputFilename = args.length > 2 ? args[2] : null; - String path = args.length > 3 ? args[3] : DFLT_CONFIG; - testHeavyMsgs = args.length > 4 && "true".equalsIgnoreCase(args[4]); - testLatency = args.length > 5 && "true".equalsIgnoreCase(args[5]); - -// threads = 128; -// testLatency = true; -// testHeavyMsgs = true; - - X.println("Config: " + path); - X.println("Test heavy messages: " + testHeavyMsgs); - X.println("Test latency: " + testLatency); - X.println("Threads: " + threads); - X.println("Duration: " + duration); - X.println("Output file name: " + outputFilename); - - GridKernal g = (GridKernal)G.start(path); - - if (g.localNode().order() > 1) { - try { - sendMessages(g, threads, duration, outputFilename); - } - finally { - G.stopAll(false); - } - } - else - receiveMessages(g); - } - - /** - * @param g Kernal. - * @param threads Number of send threads. - * @param duration Test duration. - * @param outputFilename Output file name. - */ - @SuppressWarnings("deprecation") - private static void sendMessages(GridKernal g, int threads, int duration, @Nullable final String outputFilename) { - X.println(">>> Sending messages."); - - g.context().io().addMessageListener(TEST_TOPIC, new SenderMessageListener()); - - Thread collector = startDaemon(new Runnable() { - @Override public void run() { - final long initTs = System.currentTimeMillis(); - long ts = initTs; - long queries = msgCntr.sum(); - GridCumulativeAverage qpsAvg = new GridCumulativeAverage(); - - try { - while (!Thread.currentThread().isInterrupted()) { - U.sleep(10000); - - long newTs = System.currentTimeMillis(); - long newQueries = msgCntr.sum(); - - long executed = newQueries - queries; - long time = newTs - ts; - - long qps = executed * 1000 / time; - - boolean recordAvg = ts - initTs > WARM_UP_DUR; - - if (recordAvg) qpsAvg.update(qps); - - X.println("Communication benchmark [qps=" + qps + (recordAvg ? ", qpsAvg=" + qpsAvg : "") + - ", executed=" + executed + ", time=" + time + ']'); - - ts = newTs; - queries = newQueries; - } - } - catch (IgniteInterruptedException ignored) { - // No-op. - } - - X.println("Average QPS: " + qpsAvg); - - if (outputFilename != null) { - try { - X.println("Saving results to output file: " + outputFilename); - - appendLineToFile(outputFilename, "%s,%d", GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date - ()), qpsAvg.get()); - } - catch (IOException e) { - X.println("Failed to record results to a file: " + e.getMessage()); - } - } - } - }); - - Collection<SendThread> sndThreads = new ArrayList<>(threads); - - for (int i = 0; i < threads; i++) { - SendThread t = new SendThread(g); - - sndThreads.add(t); - - t.start(); - } - - try { - U.sleep(duration > 0 ? duration * 1000 + WARM_UP_DUR : Long.MAX_VALUE); - } - catch (IgniteInterruptedException ignored) { - // No-op. - } - - collector.interrupt(); - - for (SendThread t : sndThreads) - t.interrupt(); - } - - /** - * @param g Kernal. - */ - @SuppressWarnings("deprecation") - private static void receiveMessages(final GridKernal g) { - X.println(">>> Receiving messages."); - - final GridIoManager io = g.context().io(); - - GridMessageListener lsnr = new GridMessageListener() { - private ClusterNode node; - - @Override public void onMessage(UUID nodeId, Object msg) { - if (node == null) - node = g.context().discovery().node(nodeId); - - GridTestMessage testMsg = ((GridTestMessage)msg); - - testMsg.bytes(null); - - try { - io.send(node, TEST_TOPIC, testMsg, PUBLIC_POOL); - } - catch (IgniteCheckedException e) { - e.printStackTrace(); - } - } - }; - - io.addMessageListener(TEST_TOPIC, lsnr); - } - - /** - * - */ - private static class SendThread extends Thread { - /** */ - private final GridKernal g; - - /** - * @param g Kernal. - */ - SendThread(GridKernal g) { - this.g = g; - } - - /** {@inheritDoc} */ - @Override public void run() { - try { - ClusterNode dst = awaitOther(g.context().discovery()); - - GridIoManager io = g.context().io(); - - Random rnd = ThreadLocalRandom8.current(); - - IgniteUuid msgId = IgniteUuid.randomUuid(); - - while (!Thread.interrupted()) { - CountDownLatch latch = null; - - if (testLatency) - latches.put(msgId, latch = new CountDownLatch(1)); - else - sem.acquire(); - - io.send( - dst, - TEST_TOPIC, - new GridTestMessage(msgId, testHeavyMsgs ? arrs[rnd.nextInt(arrs.length)] : null), - PUBLIC_POOL); - - if (testLatency && !latch.await(1000, MILLISECONDS)) - throw new RuntimeException("Failed to await latch."); - } - } - catch (IgniteCheckedException e) { - e.printStackTrace(); - } - catch (InterruptedException ignored) { - // No-op. - } - } - - /** - * @param disc Discovery. - * @return Second node in the topology. - * @throws InterruptedException If interrupted. - */ - @SuppressWarnings("BusyWait") - private ClusterNode awaitOther(final GridDiscoveryManager disc) throws InterruptedException { - while (disc.allNodes().size() < 2) - Thread.sleep(1000); - - for (ClusterNode node : disc.allNodes()) - if (!F.eqNodes(node, disc.localNode())) - return node; - - assert false; - - return null; - } - } - - /** - * - */ - private static class SenderMessageListener implements GridMessageListener { - /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { - msgCntr.increment(); - - if (testLatency) - latches.get(((GridTestMessage)msg).id()).countDown(); - else - sem.release(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/communication/GridIoManagerBenchmark0.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/gridgain/loadtests/communication/GridIoManagerBenchmark0.java deleted file mode 100644 index 6295665..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/communication/GridIoManagerBenchmark0.java +++ /dev/null @@ -1,469 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.communication; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.spi.communication.*; -import org.apache.ignite.spi.communication.tcp.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.gridgain.testframework.*; -import org.gridgain.testframework.junits.common.*; -import org.jdk8.backport.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; - -/** - * - */ -public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { - /** */ - public static final int CONCUR_MSGS = 10 * 1024; - - /** */ - private static final int THREADS = 2; - - /** */ - private static final long TEST_TIMEOUT = 3 * 60 * 1000; - - /** */ - private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGridsMultiThreaded(2); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(ipFinder); - - c.setDiscoverySpi(discoSpi); - - c.setCommunicationSpi(getCommunication()); - - return c; - } - - /** - * @param len Length. - * @return Test string. - */ - private static String generateTestString(int len) { - assert len > 0; - SB sb = new SB(); - - for (int i = 0; i < len; i++) - sb.a(Character.forDigit(i % 10, 10)); - - return sb.toString(); - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings("deprecation") - public void testThroughput() throws Exception { - final GridKernal sndKernal = (GridKernal)grid(0); - final GridKernal rcvKernal = (GridKernal)grid(1); - - final ClusterNode sndNode = sndKernal.localNode(); - final ClusterNode rcvNode = rcvKernal.localNode(); - - final GridIoManager snd = sndKernal.context().io(); - final GridIoManager rcv = rcvKernal.context().io(); - - info("Senders: " + THREADS); - info("Messages: " + CONCUR_MSGS); - - final Semaphore sem = new Semaphore(CONCUR_MSGS); - final LongAdder msgCntr = new LongAdder(); - - final String topic = "test-topic"; - - rcv.addMessageListener( - topic, - new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - try { - rcv.send(sndNode, topic, (GridTcpCommunicationMessageAdapter)msg, PUBLIC_POOL); - } - catch (IgniteCheckedException e) { - error("Failed to send message.", e); - } - } - }); - - snd.addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - msgCntr.increment(); - - sem.release(); - } - }); - - Timer t = new Timer("results-reporter"); - - t.schedule(new TimerTask() { - private long ts = System.currentTimeMillis(); - - @Override public void run() { - long newTs = System.currentTimeMillis(); - long qrys = msgCntr.sumThenReset(); - - long time = newTs - ts; - - X.println("Communication benchmark [qps=" + qrys * 1000 / time + - ", executed=" + qrys + ", time=" + time + ']'); - - ts = newTs; - } - }, 10000, 10000); - - final AtomicBoolean finish = new AtomicBoolean(); - - IgniteFuture<?> f = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - try { - IgniteUuid msgId = IgniteUuid.randomUuid(); - - while (!finish.get()) { - sem.acquire(); - - snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL); - } - } - catch (IgniteCheckedException e) { - X.println("Message send failed", e); - } - catch (InterruptedException ignored) { - // No-op. - } - - return null; - } - }, THREADS, "send-thread"); - - Thread.sleep(TEST_TIMEOUT); - - finish.set(true); - - sem.release(CONCUR_MSGS * 2); - - t.cancel(); - - f.get(); - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings("deprecation") - public void testLatency() throws Exception { - final GridKernal sndKernal = (GridKernal)grid(0); - final GridKernal rcvKernal = (GridKernal)grid(1); - - final ClusterNode sndNode = sndKernal.localNode(); - final ClusterNode rcvNode = rcvKernal.localNode(); - - final GridIoManager snd = sndKernal.context().io(); - final GridIoManager rcv = rcvKernal.context().io(); - - final LongAdder msgCntr = new LongAdder(); - - final Integer topic = 1; - - final Map<IgniteUuid, CountDownLatch> map = new ConcurrentHashMap8<>(); - - rcv.addMessageListener( - topic, - new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - try { - rcv.send(sndNode, topic, (GridTcpCommunicationMessageAdapter)msg, PUBLIC_POOL); - } - catch (IgniteCheckedException e) { - error("Failed to send message.", e); - } - } - }); - - snd.addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - map.get(((GridTestMessage)msg).id()).countDown(); - } - }); - - Timer t = new Timer("results-reporter"); - - t.schedule(new TimerTask() { - private long ts = System.currentTimeMillis(); - - @Override public void run() { - long newTs = System.currentTimeMillis(); - long qrys = msgCntr.sumThenReset(); - - long time = newTs - ts; - - X.println("Communication benchmark [qps=" + qrys * 1000 / time + - ", executed=" + qrys + ", time=" + time + ']'); - - ts = newTs; - } - }, 10000, 10000); - - final AtomicBoolean finish = new AtomicBoolean(); - - IgniteFuture<?> f = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - try { - IgniteUuid msgId = IgniteUuid.randomUuid(); - - while (!finish.get()) { - CountDownLatch latch = new CountDownLatch(1); - - map.put(msgId, latch); - - snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL); - - latch.await(); - - msgCntr.increment(); - } - } - catch (IgniteCheckedException e) { - X.println("Message send failed", e); - } - catch (InterruptedException ignored) { - // No-op. - } - - return null; - } - }, 1, "send-thread"); - - Thread.sleep(TEST_TIMEOUT); - - finish.set(true); - - t.cancel(); - - f.get(); - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings("deprecation") - public void testVariableLoad() throws Exception { - final GridKernal sndKernal = (GridKernal)grid(0); - final GridKernal rcvKernal = (GridKernal)grid(1); - - final ClusterNode sndNode = sndKernal.localNode(); - final ClusterNode rcvNode = rcvKernal.localNode(); - - final GridIoManager snd = sndKernal.context().io(); - final GridIoManager rcv = rcvKernal.context().io(); - - info("Senders: " + THREADS); - info("Messages: " + CONCUR_MSGS); - - final Semaphore sem = new Semaphore(CONCUR_MSGS); - final LongAdder msgCntr = new LongAdder(); - - final String topic = "test-topic"; - - final Map<IgniteUuid, CountDownLatch> latches = new ConcurrentHashMap8<>(); - - rcv.addMessageListener( - topic, - new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - try { - rcv.send(sndNode, topic, (GridTcpCommunicationMessageAdapter)msg, PUBLIC_POOL); - } - catch (IgniteCheckedException e) { - error("Failed to send message.", e); - } - } - }); - - snd.addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - msgCntr.increment(); - - sem.release(); - - CountDownLatch latch = latches.get(((GridTestMessage)msg).id()); - - if (latch != null) - latch.countDown(); - } - }); - - final AtomicBoolean finish = new AtomicBoolean(); - final AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(); - - IgniteFuture<?> f = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (!finish.get()) { - CountDownLatch latch = latchRef.get(); - - if (latch != null) - U.await(latch); - - IgniteUuid msgId = IgniteUuid.randomUuid(); - - sem.acquire(); - - snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL); - } - - return null; - } - }, THREADS, "send-thread"); - - IgniteFuture<?> f1 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - private long ts = System.currentTimeMillis(); - - @Override public Object call() throws Exception { - try { - while (!finish.get()) { - info(U.nl() + ">>>" + U.nl() + ">>> High load." + U.nl() + ">>>"); - - U.sleep(15 * 1000); - - reportNumbers(); - - info(U.nl() + ">>>" + U.nl() + ">>> Low load." + U.nl() + ">>>"); - - CountDownLatch latch = new CountDownLatch(1); - - try { - // Here will be a pause. - latchRef.set(latch); - - U.sleep(7 * 1000); - - reportNumbers(); - } - finally { - latch.countDown(); - } - } - } - catch (IgniteCheckedException e) { - X.println("Message send failed", e); - } - - return null; - } - - /** - * - */ - void reportNumbers() { - long newTs = System.currentTimeMillis(); - long qrys = msgCntr.sumThenReset(); - - long time = newTs - ts; - - X.println("Communication benchmark [qps=" + qrys * 1000 / time + - ", executed=" + qrys + ", time=" + time + ']'); - - ts = newTs; - - } - }, 1, "load-dispatcher"); - - IgniteFuture<?> f2 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (!finish.get()) { - U.sleep(1000); - - IgniteUuid msgId = IgniteUuid.randomUuid(); - CountDownLatch latch = new CountDownLatch(1); - - latches.put(msgId, latch); - - snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL); - - long start = System.currentTimeMillis(); - - latch.await(); - - info("Response time: " + (System.currentTimeMillis() - start)); - } - - return null; - } - }, THREADS, "low-loader"); - - Thread.sleep(TEST_TIMEOUT); - - finish.set(true); - - sem.release(CONCUR_MSGS * 2); - - f.get(); - f1.get(); - f2.get(); - } - - /** - * @return SPI instance. - */ - private CommunicationSpi getCommunication() { - TcpCommunicationSpi spi = new TcpCommunicationSpi(); - - spi.setTcpNoDelay(true); - spi.setSharedMemoryPort(-1); - spi.setConnectionBufferSize(0); - - info("Comm SPI: " + spi); - - return spi; - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return TEST_TIMEOUT + 60 * 1000; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/communication/GridTcpCommunicationBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/communication/GridTcpCommunicationBenchmark.java b/modules/core/src/test/java/org/gridgain/loadtests/communication/GridTcpCommunicationBenchmark.java deleted file mode 100644 index a8c0287..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/communication/GridTcpCommunicationBenchmark.java +++ /dev/null @@ -1,586 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.communication; -// -//import org.gridgain.grid.*; -//import org.gridgain.grid.lang.*; -//import org.gridgain.grid.lang.utils.*; -//import org.gridgain.grid.spi.*; -//import org.gridgain.grid.spi.communication.*; -//import org.gridgain.grid.spi.communication.tcp.*; -//import org.gridgain.grid.typedef.*; -//import org.gridgain.grid.typedef.internal.*; -//import org.gridgain.testframework.*; -//import org.gridgain.testframework.junits.*; -//import org.gridgain.testframework.junits.spi.*; -// -//import java.util.*; -//import java.util.concurrent.*; -//import java.util.concurrent.atomic.*; -// -///** */ -//@GridSpiTest(spi = GridTcpCommunicationSpi.class, group = "TCP communication SPI benchmark.") -//public class GridTcpCommunicationBenchmark extends GridSpiAbstractTest<GridTcpCommunicationSpi> { -// /** */ -// public static final int CONCUR_MSGS = 10 * 1024; -// -// /** */ -// private static final int THREADS = 1; -// -// /** */ -// private static final long TEST_TIMEOUT = 3 * 60 * 1000; -// -// /** */ -// private final Collection<GridTestResources> spiRsrcs = new ArrayList<>(); -// -// /** */ -// private final Map<UUID, GridCommunicationSpi> spis = new HashMap<>(); -// -// /** */ -// private final Collection<GridNode> nodes = new ArrayList<>(); -// -// /** -// * Disable automatic test SPI start. -// */ -// public GridTcpCommunicationBenchmark() { -// super(false); -// } -// -// /** {@inheritDoc} */ -// @Override protected void beforeTest() throws Exception { -// Map<GridNode, GridSpiTestContext> ctxs = new HashMap<>(); -// -// for (int i = 0; i < 2; i++) { -// GridCommunicationSpi spi = getCommunication(); -// -// GridTestResources rsrcs = new GridTestResources(); -// -// GridTestNode node = new GridTestNode(rsrcs.getNodeId()); -// -// GridSpiTestContext ctx = initSpiContext(); -// -// ctx.setLocalNode(node); -// -// spiRsrcs.add(rsrcs); -// -// rsrcs.inject(spi); -// -// node.setAttributes(spi.getNodeAttributes()); -// -// nodes.add(node); -// -// spi.spiStart(getTestGridName() + (i + 1)); -// -// spis.put(rsrcs.getNodeId(), spi); -// -// spi.onContextInitialized(ctx); -// -// ctxs.put(node, ctx); -// } -// -// // For each context set remote nodes. -// for (Map.Entry<GridNode, GridSpiTestContext> e : ctxs.entrySet()) { -// for (GridNode n : nodes) { -// if (!n.equals(e.getKey())) -// e.getValue().remoteNodes().add(n); -// } -// } -// } -// -// /** {@inheritDoc} */ -// @Override protected void afterTest() throws Exception { -// for (GridCommunicationSpi spi : spis.values()) { -// spi.setListener(null); -// -// spi.spiStop(); -// } -// -// for (GridTestResources rsrcs : spiRsrcs) -// rsrcs.stopThreads(); -// } -// -// /** -// * @param len Length. -// * @return Test string. -// */ -// private static String generateTestString(int len) { -// assert len > 0; -// SB sb = new SB(); -// -// for (int i = 0; i < len; i++) -// sb.a(Character.forDigit(i % 10, 10)); -// -// return sb.toString(); -// } -// -// /** -// * @throws Exception If failed. -// */ -// @SuppressWarnings("deprecation") -// public void testThroughput() throws Exception { -// assert spis.size() == 2; -// assert nodes.size() == 2; -// -// Iterator<GridNode> it = nodes.iterator(); -// -// final GridNode sndNode = it.next(); -// final GridNode rcvNode = it.next(); -// -// final GridCommunicationSpi sndComm = spis.get(sndNode.id()); -// final GridCommunicationSpi rcvComm = spis.get(rcvNode.id()); -// -// final String testStr = generateTestString(66); -// -// info("Test string length: " + testStr.length()); -// info("Senders: " + THREADS); -// info("Messages: " + CONCUR_MSGS); -// -// final Semaphore sem = new Semaphore(CONCUR_MSGS); -// final LongAdder msgCntr = new LongAdder(); -// -// rcvComm.setListener(new GridCommunicationListener() { -// @Override public void onMessage(UUID nodeId, byte[] msg, GridAbsClosure msgC) { -// try { -// byte[] res = U.join(U.intToBytes(msg.length), msg); -// -// rcvComm.sendMessage(sndNode, res, 0, res.length); -// } -// catch (GridSpiException e) { -// log.error("Message echo failed.", e); -// } -// finally { -// msgC.apply(); -// } -// } -// }); -// -// sndComm.setListener(new GridCommunicationListener() { -// @Override public void onMessage(UUID nodeId, byte[] msg, GridAbsClosure msgC) { -// msgCntr.increment(); -// -// sem.release(); -// -// msgC.apply(); -// } -// }); -// -// Timer t = new Timer("results-reporter"); -// -// t.schedule(new TimerTask() { -// private long ts = System.currentTimeMillis(); -// -// @Override public void run() { -// long newTs = System.currentTimeMillis(); -// long qrys = msgCntr.sumThenReset(); -// -// long time = newTs - ts; -// -// X.println("Communication benchmark [qps=" + qrys * 1000 / time + -// ", executed=" + qrys + ", time=" + time + ']'); -// -// ts = newTs; -// } -// }, 10000, 10000); -// -// final AtomicBoolean finish = new AtomicBoolean(); -// -// GridFuture<?> f = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { -// @Override public Object call() throws Exception { -// try { -// while (!finish.get()) { -// GridUuid msgId = GridUuid.randomUuid(); -// -// sem.acquire(); -// -// // Loading message with additional data, to get results, -// // comparable with IoManager benchmark. -// GridTuple2<byte[], Integer> t = getTestResources().getMarshaller(). -// marshalNoCopy(new GridTestMessage(msgId, testStr), 4); -// -// byte[] buf = t.get1(); -// int len = t.get2(); -// -// U.intToBytes(len - 4, buf, 0); -// -// sndComm.sendMessage(rcvNode, buf, 0, len); -// } -// } -// catch (IgniteCheckedException e) { -// X.println("Message send failed", e); -// } -// catch (InterruptedException ignored) { -// // No-op. -// } -// -// return null; -// } -// }, THREADS, "send-thread"); -// -// Thread.sleep(TEST_TIMEOUT); -// -// finish.set(true); -// -// sem.release(CONCUR_MSGS * 2); -// -// t.cancel(); -// -// f.get(); -// } -// -// /** -// * @throws Exception If failed. -// */ -// @SuppressWarnings("deprecation") -// public void testLatency() throws Exception { -// assert spis.size() == 2; -// assert nodes.size() == 2; -// -// Iterator<GridNode> it = nodes.iterator(); -// -// final GridNode sndNode = it.next(); -// final GridNode rcvNode = it.next(); -// -// final GridCommunicationSpi sndComm = spis.get(sndNode.id()); -// final GridCommunicationSpi rcvComm = spis.get(rcvNode.id()); -// -// final String testStr = generateTestString(66); -// -// info("Test string length: " + testStr.length()); -// -// final LongAdder msgCntr = new LongAdder(); -// -// final Map<GridUuid, CountDownLatch> map = new ConcurrentHashMap8<>(); -// -// rcvComm.setListener(new GridCommunicationListener() { -// @Override public void onMessage(UUID nodeId, byte[] msg, GridAbsClosure msgC) { -// try { -// byte[] res = U.join(U.intToBytes(msg.length), msg); -// -// rcvComm.sendMessage(sndNode, res, 0, res.length); -// } -// catch (GridSpiException e) { -// log.error("Message echo failed.", e); -// } -// finally { -// msgC.apply(); -// } -// } -// }); -// -// final ClassLoader clsLdr = getClass().getClassLoader(); -// -// sndComm.setListener(new GridCommunicationListener() { -// @Override public void onMessage(UUID nodeId, byte[] msg, GridAbsClosure msgC) { -// try { -// GridTestMessage testMsg = getTestResources().getMarshaller().unmarshal(msg, clsLdr); -// -// map.get(testMsg.id()).countDown(); -// } -// catch (IgniteCheckedException e) { -// U.error(log, "Failed to ", e); -// } -// finally { -// msgC.apply(); -// } -// } -// }); -// -// Timer t = new Timer("results-reporter"); -// -// t.schedule(new TimerTask() { -// private long ts = System.currentTimeMillis(); -// -// @Override public void run() { -// long newTs = System.currentTimeMillis(); -// long qrys = msgCntr.sumThenReset(); -// -// long time = newTs - ts; -// -// X.println("Communication benchmark [qps=" + qrys * 1000 / time + -// ", executed=" + qrys + ", time=" + time + ']'); -// -// ts = newTs; -// } -// }, 10000, 10000); -// -// final AtomicBoolean finish = new AtomicBoolean(); -// -// GridFuture<?> f = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { -// @Override public Object call() throws Exception { -// info("Test thread started."); -// -// try { -// GridUuid msgId = GridUuid.randomUuid(); -// -// GridTuple2<byte[], Integer> t = getTestResources().getMarshaller(). -// marshalNoCopy(new GridTestMessage(msgId, testStr), 4); -// -// byte[] buf = t.get1(); -// int len = t.get2(); -// -// U.intToBytes(len - 4, buf, 0); -// -// while (!finish.get()) { -// // Loading message with additional data, to get results, -// // comparable with IoManager benchmark. -// CountDownLatch latch = new CountDownLatch(1); -// -// map.put(msgId, latch); -// -// sndComm.sendMessage(rcvNode, buf, 0, len); -// -// latch.await(); -// -// msgCntr.increment(); -// } -// } -// catch (IgniteCheckedException e) { -// X.println("Message send failed", e); -// } -// catch (InterruptedException ignored) { -// // No-op. -// } -// -// return null; -// } -// }, 2, "send-thread"); -// -// Thread.sleep(TEST_TIMEOUT); -// -// finish.set(true); -// -// t.cancel(); -// -// f.get(); -// } -// -// /** -// * @throws Exception If failed. -// */ -// @SuppressWarnings("deprecation") -// public void testVariableLoad() throws Exception { -// assert spis.size() == 2; -// assert nodes.size() == 2; -// -// Iterator<GridNode> it = nodes.iterator(); -// -// final GridNode sndNode = it.next(); -// final GridNode rcvNode = it.next(); -// -// final GridCommunicationSpi sndComm = spis.get(sndNode.id()); -// final GridCommunicationSpi rcvComm = spis.get(rcvNode.id()); -// -// final String testStr = generateTestString(16); -// -// info("Test string length: " + testStr.length()); -// info("Senders: " + THREADS); -// info("Messages: " + CONCUR_MSGS); -// -// final Semaphore sem = new Semaphore(CONCUR_MSGS); -// final LongAdder msgCntr = new LongAdder(); -// -// final Map<GridUuid, CountDownLatch> latches = new ConcurrentHashMap8<>(); -// -// rcvComm.setListener(new GridCommunicationListener() { -// @Override public void onMessage(UUID nodeId, byte[] msg, GridAbsClosure msgC) { -// try { -// byte[] res = U.join(U.intToBytes(msg.length), msg); -// -// rcvComm.sendMessage(sndNode, res, 0, res.length); -// } -// catch (GridSpiException e) { -// log.error("Message echo failed.", e); -// } -// finally { -// msgC.apply(); -// } -// } -// }); -// -// sndComm.setListener(new GridCommunicationListener() { -// @Override public void onMessage(UUID nodeId, byte[] buf, GridAbsClosure msgC) { -// msgCntr.increment(); -// -// sem.release(); -// -// GridTestMessage msg = null; -// -// try { -// msg = getTestResources().getMarshaller().unmarshal(buf, U.gridClassLoader()); -// } -// catch (IgniteCheckedException e) { -// U.error(log, "Failed to unmarshal message.", e); -// -// fail(); -// } -// finally { -// msgC.apply(); -// } -// -// CountDownLatch latch = latches.get(msg.id()); -// -// if (latch != null) -// latch.countDown(); -// } -// }); -// -// final AtomicBoolean finish = new AtomicBoolean(); -// final AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(); -// -// GridFuture<?> f = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { -// @Override public Object call() throws Exception { -// while (!finish.get()) { -// CountDownLatch latch = latchRef.get(); -// -// if (latch != null) -// U.await(latch); -// -// GridUuid msgId = GridUuid.randomUuid(); -// -// sem.acquire(); -// -// // Loading message with additional data, to get results, -// // comparable with IoManager benchmark. -// GridTuple2<byte[], Integer> t = getTestResources().getMarshaller(). -// marshalNoCopy(new GridTestMessage(msgId, testStr), 4); -// -// byte[] buf = t.get1(); -// int len = t.get2(); -// -// U.intToBytes(len - 4, buf, 0); -// -// sndComm.sendMessage(rcvNode, buf, 0, len); -// } -// -// return null; -// } -// }, THREADS, "send-thread"); -// -// GridFuture<?> f1 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { -// private long ts = System.currentTimeMillis(); -// -// @Override public Object call() throws Exception { -// try { -// while (!finish.get()) { -// info(U.nl() + ">>>" + U.nl() + ">>> High load." + U.nl() + ">>>"); -// -// U.sleep(15 * 1000); -// -// reportNumbers(); -// -// info(U.nl() + ">>>" + U.nl() + ">>> Low load." + U.nl() + ">>>"); -// -// CountDownLatch latch = new CountDownLatch(1); -// -// try { -// // Here will be a pause. -// latchRef.set(latch); -// -// U.sleep(7 * 1000); -// -// reportNumbers(); -// } -// finally { -// latch.countDown(); -// } -// } -// } -// catch (IgniteCheckedException e) { -// X.println("Message send failed", e); -// } -// -// return null; -// } -// -// /** -// * -// */ -// void reportNumbers() { -// long newTs = System.currentTimeMillis(); -// long qrys = msgCntr.sumThenReset(); -// -// long time = newTs - ts; -// -// X.println("Communication benchmark [qps=" + qrys * 1000 / time + -// ", executed=" + qrys + ", time=" + time + ']'); -// -// ts = newTs; -// -// } -// }, 1, "load-dispatcher"); -// -// GridFuture<?> f2 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { -// @Override public Object call() throws Exception { -// while (!finish.get()) { -// U.sleep(1000); -// -// GridUuid msgId = GridUuid.randomUuid(); -// CountDownLatch latch = new CountDownLatch(1); -// -// latches.put(msgId, latch); -// -// GridTuple2<byte[], Integer> t = getTestResources().getMarshaller(). -// marshalNoCopy(new GridTestMessage(msgId, testStr), 4); -// -// byte[] buf = t.get1(); -// int len = t.get2(); -// -// U.intToBytes(len - 4, buf, 0); -// -// sndComm.sendMessage(rcvNode, buf, 0, len); -// -// long start = System.currentTimeMillis(); -// -// latch.await(); -// -// info("Response time: " + (System.currentTimeMillis() - start)); -// } -// -// return null; -// } -// }, THREADS, "low-loader"); -// -// Thread.sleep(TEST_TIMEOUT); -// -// finish.set(true); -// -// sem.release(CONCUR_MSGS * 2); -// -// f.get(); -// f1.get(); -// f2.get(); -// } -// -// /** -// * @return SPI instance. -// */ -// private GridCommunicationSpi getCommunication() { -// GridTcpCommunicationSpi spi = new GridTcpCommunicationSpi(); -// -// spi.setSharedMemoryPort(-1); -// spi.setTcpNoDelay(true); -// spi.setLocalAddress("127.0.0.1"); -// -// return spi; -// } -// -// /** {@inheritDoc} */ -// @Override protected long getTestTimeout() { -// return TEST_TIMEOUT + 60 * 1000; -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/communication/GridTestMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/communication/GridTestMessage.java b/modules/core/src/test/java/org/gridgain/loadtests/communication/GridTestMessage.java deleted file mode 100644 index 304cb3d..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/communication/GridTestMessage.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.communication; - -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.nio.*; - -/** - * - */ -class GridTestMessage extends GridTcpCommunicationMessageAdapter implements Externalizable { - /** */ - private IgniteUuid id; - - /** */ - private long field1; - - /** */ - private long field2; - - /** */ - private String str; - - /** */ - private byte[] bytes; - - /** - * @param id Message ID. - * @param str String. - */ - GridTestMessage(IgniteUuid id, String str) { - this.id = id; - this.str = str; - } - - /** - * @param id Message ID. - * @param bytes Bytes. - */ - GridTestMessage(IgniteUuid id, byte[] bytes) { - this.id = id; - this.bytes = bytes; - } - - /** - * For Externalizable support. - */ - public GridTestMessage() { - // No-op. - } - - /** - * @return Message ID. - */ - public IgniteUuid id() { - return id; - } - - /** - * @return Bytes. - */ - public byte[] bytes() { - return bytes; - } - - /** - * @param bytes Bytes. - */ - public void bytes(byte[] bytes) { - this.bytes = bytes; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeGridUuid(out, id); - out.writeLong(field1); - out.writeLong(field2); - U.writeString(out, str); - U.writeByteArray(out, bytes); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - id = U.readGridUuid(in); - field1 = in.readLong(); - field2 = in.readLong(); - str = U.readString(in); - bytes = U.readByteArray(in); - } - - /** {@inheritDoc} */ - @SuppressWarnings("CloneDoesntCallSuperClone") - @Override public GridTcpCommunicationMessageAdapter clone() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf) { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/continuous/GridContinuousOperationsLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/continuous/GridContinuousOperationsLoadTest.java b/modules/core/src/test/java/org/gridgain/loadtests/continuous/GridContinuousOperationsLoadTest.java deleted file mode 100644 index 09f9de8..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/continuous/GridContinuousOperationsLoadTest.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.continuous; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.processors.cache.query.continuous.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.events.IgniteEventType.*; -import static org.gridgain.loadtests.util.GridLoadTestArgs.*; -import static org.gridgain.testframework.GridLoadTestUtils.*; -import static org.gridgain.testframework.GridTestUtils.*; - -/** - * Load test for {@link org.apache.ignite.cache.query.GridCacheContinuousQuery}. - */ -public class GridContinuousOperationsLoadTest { - /** - * Main method. - * - * @param args Command line arguments. - * @throws Exception If error occurs. - */ - public static void main(String[] args) throws Exception { - final String cfgPath = args.length > 0 ? args[0] : "examples/config/example-cache.xml"; - final String cacheName = getStringProperty(CACHE_NAME, "partitioned"); - final Integer valSize = getIntProperty(VALUE_SIZE, 1024); - final Integer threadsCnt = getIntProperty(THREADS_CNT, 8); - final Integer testDurSec = getIntProperty(TEST_DUR_SEC, 180); - - final Integer filterSkipProb = getIntProperty("GG_FILTER_SKIP_PROBABILITY", 10, new C1<Integer, String>() { - @Nullable @Override public String apply(Integer val) { - if (val < 0 || val > 100) - return "The value should be between 1 and 100."; - - return null; - } - }); - - final boolean useQry = getBooleanProperty("GG_USE_QUERIES", true); - final int bufSize = getIntProperty("GG_BUFFER_SIZE", 1); - final long timeInterval = getLongProperty("GG_TIME_INTERVAL", 0); - final int parallelCnt = getIntProperty("GG_PARALLEL_COUNT", 8); - final int keyRange = getIntProperty("GG_KEY_RANGE", 100000); - final long updSleepMs = getLongProperty("GG_UPDATE_SLEEP_MS", 0); - final long filterSleepMs = getLongProperty("GG_FILTER_SLEEP_MS", 0); - final long cbSleepMs = getLongProperty("GG_CALLBACK_SLEEP_MS", 0); - - X.println("The test will start with the following parameters:"); - - dumpProperties(System.out); - - try (Ignite ignite = Ignition.start(cfgPath)) { - final GridCache<Object, Object> cache = ignite.cache(cacheName); - - if (cache == null) - throw new IgniteCheckedException("Cache is not configured: " + cacheName); - - // Continuous query manager, used to monitor queue size. - final GridCacheContinuousQueryManager contQryMgr = - ((GridCacheAdapter)((GridCacheProxyImpl)cache).cache()).context().continuousQueries(); - - if (contQryMgr == null) - throw new IgniteCheckedException("Could not access GridCacheContinuousQueryManager"); - - final AtomicBoolean stop = new AtomicBoolean(); // Stop flag. - final AtomicLong cbCntr = new AtomicLong(); // Callback counter. - final AtomicLong updCntr = new AtomicLong(); // Update counter. - - for (int i = 0; i < parallelCnt; i++) { - if (useQry) { - GridCacheContinuousQuery<Object, Object> qry = cache.queries().createContinuousQuery(); - - qry.callback(new PX2<UUID, Collection<Map.Entry<Object, Object>>>() { - @Override public boolean applyx(UUID uuid, Collection<Map.Entry<Object, Object>> entries) - throws IgniteInterruptedException { - if (cbSleepMs > 0) - U.sleep(cbSleepMs); - - cbCntr.addAndGet(entries.size()); - - return true; // Continue listening. - } - }); - - qry.filter(new PX2<Object, Object>() { - @Override public boolean applyx(Object key, Object val) throws IgniteInterruptedException { - if (filterSleepMs > 0) - U.sleep(filterSleepMs); - - return Math.random() * 100 >= filterSkipProb; - } - }); - - qry.bufferSize(bufSize); - qry.timeInterval(timeInterval); - - qry.execute(); - } - else { - ignite.events().remoteListen( - bufSize, - timeInterval, - true, - new PX2<UUID, IgniteEvent>() { - @Override - public boolean applyx(UUID uuid, IgniteEvent evt) - throws IgniteInterruptedException { - if (cbSleepMs > 0) - U.sleep(cbSleepMs); - - cbCntr.incrementAndGet(); - - return true; // Continue listening. - } - }, - new PX1<IgniteEvent>() { - @Override - public boolean applyx(IgniteEvent evt) throws IgniteInterruptedException { - if (filterSleepMs > 0) - U.sleep(filterSleepMs); - - return Math.random() * 100 >= filterSkipProb; - } - }, - EVT_CACHE_OBJECT_PUT - ); - } - } - - // Start collector thread. - startDaemon(new Runnable() { - @Override public void run() { - try { - while (!stop.get() && !Thread.currentThread().isInterrupted()) { - long cbCntr0 = cbCntr.get(); - long updCntr0 = updCntr.get(); - - U.sleep(1000); - - long cbDelta = cbCntr.get() - cbCntr0; - long updDelta = updCntr.get() - updCntr0; - - X.println("Stats [entriesPerSec=" + cbDelta + - ", updatesPerSec=" + updDelta + ']'); - } - } - catch (IgniteInterruptedException ignored) { - // No-op. - } - } - }); - - X.println("Starting " + threadsCnt + " generator thread(s)."); - - // Start generator threads. - IgniteFuture<Long> genFut = runMultiThreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - byte[] val = new byte[valSize]; - ThreadLocalRandom8 rnd = ThreadLocalRandom8.current(); - - while (!stop.get() && !Thread.currentThread().isInterrupted()) { - Integer key = rnd.nextInt(keyRange); - - cache.putx(key, val); - - updCntr.incrementAndGet(); - - if (updSleepMs > 0) - U.sleep(updSleepMs); - } - - return true; - } - }, threadsCnt, "load-test-generator"); - - U.sleep(testDurSec * 1000); - - stop.set(true); - - genFut.get(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java b/modules/core/src/test/java/org/gridgain/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java deleted file mode 100644 index 1b5fb7e..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.datastructures; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.consistenthash.*; -import org.apache.ignite.cache.datastructures.*; -import org.apache.ignite.cache.eviction.lru.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.transactions.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.gridgain.testframework.junits.common.*; - -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.transactions.IgniteTxConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; -import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; - -/** - * Load test for atomic long. - */ -public class GridCachePartitionedAtomicLongLoadTest extends GridCommonAbstractTest { - /** Test duration. */ - private static final long DURATION = 8 * 60 * 60 * 1000; - - /** */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final AtomicInteger idx = new AtomicInteger(); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - c.getTransactionsConfiguration().setDefaultTxConcurrency(PESSIMISTIC); - c.getTransactionsConfiguration().setDefaultTxIsolation(REPEATABLE_READ); - - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(GridCacheMode.PARTITIONED); - cc.setStartSize(200); - cc.setPreloadMode(GridCachePreloadMode.SYNC); - cc.setWriteSynchronizationMode(FULL_SYNC); - cc.setEvictionPolicy(new GridCacheLruEvictionPolicy<>(1000)); - cc.setBackups(1); - cc.setAffinity(new GridCacheConsistentHashAffinityFunction(true)); - cc.setAtomicSequenceReserveSize(10); - cc.setEvictSynchronized(true); - cc.setEvictNearSynchronized(true); - - c.setCacheConfiguration(cc); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(ipFinder); - - c.setDiscoverySpi(disco); - - return c; - } - - /** - * @throws Exception If failed. - */ - public void testLoad() throws Exception { - startGrid(); - - try { - multithreaded(new AtomicCallable(), 50); - } - finally { - stopGrid(); - } - } - - /** - * - */ - private class AtomicCallable implements Callable<Boolean> { - /** {@inheritDoc} */ - @Override public Boolean call() throws Exception { - Ignite ignite = grid(); - - GridCache cache = ignite.cache(null); - - assert cache != null; - - GridCacheAtomicSequence seq = cache.dataStructures().atomicSequence("SEQUENCE", 0, true); - - long start = System.currentTimeMillis(); - - while (System.currentTimeMillis() - start < DURATION && !Thread.currentThread().isInterrupted()) { - IgniteTx tx = cache.txStart(); - - long seqVal = seq.incrementAndGet(); - - int curIdx = idx.incrementAndGet(); - - if (curIdx % 1000 == 0) - info("Sequence value [seq=" + seqVal + ", idx=" + curIdx + ']'); - - tx.commit(); - } - - return true; - } - } -}