http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiResponse.java b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiResponse.java new file mode 100644 index 0000000..a91e774 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiResponse.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.dsi; + +import org.apache.ignite.cache.affinity.*; + +import java.io.*; + +/** + * + */ +public class GridDsiResponse implements Serializable { + /** */ + private long id; + + /** */ + @SuppressWarnings("UnusedDeclaration") + private long msgId; + + /** */ + @SuppressWarnings("UnusedDeclaration") + private long transactionId; + + /** + * @param id ID. + */ + public GridDsiResponse(long id) { + this.id = id; + } + + /** + * @param terminalId Terminal ID. + * @return Cache key. + */ + public Object getCacheKey(String terminalId){ + //return new GridCacheAffinityKey<String>("RESPONSE:" + id.toString(), terminalId); + return new ResponseKey(id, terminalId); + } + + /** + * + */ + @SuppressWarnings("PackageVisibleInnerClass") + static class ResponseKey implements Serializable { + /** */ + private Long key; + + /** */ + @SuppressWarnings("UnusedDeclaration") + @GridCacheAffinityKeyMapped + private String terminalId; + + /** + * @param key Key. + * @param terminalId Terminal ID. + */ + ResponseKey(Long key, String terminalId) { + this.key = key; + this.terminalId = terminalId; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof ResponseKey && key.equals(((ResponseKey)obj).key); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiResponseTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiResponseTask.java b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiResponseTask.java new file mode 100644 index 0000000..30558d4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiResponseTask.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.dsi; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; + +/** + * Adapter to be used by client. + */ +public class GridDsiResponseTask extends ComputeTaskSplitAdapter<GridDsiMessage, T3<Long, Integer, Integer>> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int arg0, GridDsiMessage msg) throws IgniteCheckedException { + return Collections.singletonList(new GridDsiPerfJob(msg)); + } + + /** {@inheritDoc} */ + @Override public T3<Long, Integer, Integer> reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == 1; + + return results.get(0).getData(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiSession.java b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiSession.java new file mode 100644 index 0000000..7a9a6b7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiSession.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.dsi; + +import org.apache.ignite.cache.affinity.*; + +import java.io.*; + +/** + * + */ +public class GridDsiSession implements Serializable{ + /** */ + private String terminalId; + + /** + * @param terminalId Terminal ID. + */ + GridDsiSession(String terminalId) { + this.terminalId = terminalId; + } + + /** + * @return Cache key. + */ + public Object getCacheKey() { + return getCacheKey(terminalId); + } + + /** + * @param terminalId Terminal ID. + * @return Object. + */ + public static Object getCacheKey(String terminalId) { + return new SessionKey(terminalId + "SESSION", terminalId); + } + + /** + * + */ + private static class SessionKey implements Serializable { + /** */ + private String key; + + /** */ + @SuppressWarnings("UnusedDeclaration") + @GridCacheAffinityKeyMapped + private String terminalId; + + /** + * @param key Key. + * @param terminalId Terminal ID. + */ + SessionKey(String key, String terminalId) { + this.key = key; + this.terminalId = terminalId; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof SessionKey && key.equals(((SessionKey)obj).key); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/cacheget/GridBenchmarkCacheGetLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/cacheget/GridBenchmarkCacheGetLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/cacheget/GridBenchmarkCacheGetLoadTest.java new file mode 100644 index 0000000..374f02d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/cacheget/GridBenchmarkCacheGetLoadTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.dsi.cacheget; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; + +import java.util.concurrent.atomic.*; + +/** + * This is an adapted test case from DSI-49 (http://www.gridgainsystems.com/jira/browse/DSI-49). + */ +public class GridBenchmarkCacheGetLoadTest { + /** */ + private static AtomicLong cnt = new AtomicLong(); + + /** */ + private static AtomicLong latency = new AtomicLong(); + + /** */ + private static AtomicLong id = new AtomicLong(); + + private static Thread t; + + /** + * + */ + private GridBenchmarkCacheGetLoadTest() { + // No-op. + } + + /** + * @param args Args. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + Ignition.start("modules/core/src/test/config/load/dsi-49-server-production.xml"); + + GridCache<Long, Long> cache = Ignition.ignite("dsi").cache("PARTITIONED_CACHE"); + + stats(); + + boolean usePrj = true; + + GridCacheProjection<Long, Long> cachePrj = cache.projection(Long.class, Long.class); + + for (int i = 0; i < 5000000; i++) { + long t0 = System.currentTimeMillis(); + + cnt.incrementAndGet(); + + if (usePrj) + // This is slow + cachePrj.get(id.incrementAndGet()); + else + // This is fast + cache.get(id.incrementAndGet()); + + latency.addAndGet(System.currentTimeMillis() - t0); + } + + System.out.println("Finished test."); + + if (t != null) { + t.interrupt(); + t.join(); + } + } + + /** + * + */ + public static void stats() { + t = new Thread(new Runnable() { + @SuppressWarnings({"InfiniteLoopStatement", "BusyWait"}) + @Override public void run() { + int interval = 5; + + while (!Thread.currentThread().isInterrupted()) { + long cnt0 = cnt.get(); + long lt0 = latency.get(); + + try { + Thread.sleep(interval * 1000); + } + catch (InterruptedException e) { + System.out.println("Stat thread got interrupted: " + e); + + return; + } + + long cnt1 = cnt.get(); + long lt1 = latency.get(); + + System.out.println("Get/s: " + (cnt1 - cnt0) / interval); + System.out.println("Avg Latency: " + ((cnt1 - cnt0) > 0 ? (lt1 - lt0) / (cnt1 - cnt0) + + "ms" : "invalid")); + } + } + }); + + t.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/package.html b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/package.html new file mode 100644 index 0000000..e291932 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> +<!-- Package description. --> +Contains internal tests or test related classes and interfaces. +</body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridBoundedConcurrentLinkedHashSetLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridBoundedConcurrentLinkedHashSetLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridBoundedConcurrentLinkedHashSetLoadTest.java new file mode 100644 index 0000000..1379f89 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridBoundedConcurrentLinkedHashSetLoadTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.hashmap; + +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.loadtests.util.*; +import org.jdk8.backport.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.jdk8.backport.ConcurrentLinkedHashMap.QueuePolicy; +import static org.jdk8.backport.ConcurrentLinkedHashMap.QueuePolicy.*; + +/** + * + */ +public class GridBoundedConcurrentLinkedHashSetLoadTest { + /** */ + public static final int UPDATE_INTERVAL_SEC = 5; + + /** + * @param args Arguments. + */ + public static void main(String[] args) throws Exception { + QueuePolicy qPlc = args.length > 0 ? QueuePolicy.valueOf(args[0]) : SINGLE_Q; + int threadCnt = args.length > 1 ? Integer.valueOf(args[1]) : Runtime.getRuntime().availableProcessors(); + + X.println("Queue policy: " + qPlc); + X.println("Threads: " + threadCnt); + + ExecutorService pool = Executors.newFixedThreadPool(threadCnt); + + final Collection<IgniteUuid> set = + new GridBoundedConcurrentLinkedHashSet<>(10240, 32, 0.75f, 128, qPlc); + + X.println("Set: " + set); + + final LongAdder execCnt = new LongAdder(); + + final AtomicBoolean finish = new AtomicBoolean(); + + // Thread that measures and outputs performance statistics. + Thread collector = new Thread(new Runnable() { + @SuppressWarnings({"BusyWait", "InfiniteLoopStatement"}) + @Override public void run() { + GridCumulativeAverage avgTasksPerSec = new GridCumulativeAverage(); + + try { + while (!finish.get()) { + Thread.sleep(UPDATE_INTERVAL_SEC * 1000); + + long curTasksPerSec = execCnt.sumThenReset() / UPDATE_INTERVAL_SEC; + + X.println(">>> Tasks/s: " + curTasksPerSec); + + avgTasksPerSec.update(curTasksPerSec); + } + } + catch (InterruptedException ignored) { + X.println(">>> Interrupted."); + + Thread.currentThread().interrupt(); + } + } + }); + + collector.start(); + + Collection<Callable<Object>> producers = new ArrayList<>(threadCnt); + + for (int i = 0; i < threadCnt; i++) + producers.add(new Callable<Object>() { + @SuppressWarnings({"unchecked", "InfiniteLoopStatement"}) + @Override public Object call() throws Exception { + UUID id = UUID.randomUUID(); + + try { + while (!finish.get()) { + set.add(IgniteUuid.fromUuid(id)); + + execCnt.increment(); + } + + return null; + } + catch (Throwable t) { + t.printStackTrace(); + + throw new Exception(t); + } + finally { + X.println("Thread finished."); + } + } + }); + + pool.invokeAll(producers); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java new file mode 100644 index 0000000..af1841a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.hashmap; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.datastructures.*; +import org.apache.ignite.internal.processors.cache.dr.os.*; +import org.apache.ignite.internal.processors.cache.jta.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.cache.query.continuous.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.gridgain.testframework.junits.*; + +import java.util.*; + +import static org.gridgain.testframework.junits.GridAbstractTest.*; + +/** + * Cache test context. + */ +public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { + /** + * @param ctx Context. + * @throws Exception If failed. + */ + @SuppressWarnings("NullableProblems") + public GridCacheTestContext(GridTestKernalContext ctx) throws Exception { + super( + ctx, + new GridCacheSharedContext<>( + ctx, + new IgniteTxManager<K, V>(), + new GridCacheVersionManager<K, V>(), + new GridCacheMvccManager<K, V>(), + new GridCacheDeploymentManager<K, V>(), + new GridCachePartitionExchangeManager<K, V>(), + new GridCacheIoManager<K, V>() + ), + defaultCacheConfiguration(), + new GridCacheEventManager<K, V>(), + new GridCacheSwapManager<K, V>(false), + new GridCacheStoreManager<K, V>(null, + new IdentityHashMap<CacheStore, ThreadLocal>(), + null, + new CacheConfiguration()), + new GridCacheEvictionManager<K, V>(), + new GridCacheLocalQueryManager<K, V>(), + new GridCacheContinuousQueryManager<K, V>(), + new GridCacheAffinityManager<K, V>(), + new GridCacheDataStructuresManager<K, V>(), + new GridCacheTtlManager<K, V>(), + new GridOsCacheDrManager<K, V>(), + new GridCacheNoopJtaManager<K, V>()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java new file mode 100644 index 0000000..252f1bd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.hashmap; + +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.gridgain.testframework.junits.*; +import org.gridgain.testframework.junits.common.*; +import org.gridgain.testframework.junits.logger.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Tests hashmap load. + */ +@SuppressWarnings("InfiniteLoopStatement") +public class GridHashMapLoadTest extends GridCommonAbstractTest { + /** + * + */ + public void testHashMapLoad() { + Map<Integer, Integer> map = new HashMap<>(5 * 1024 * 1024); + + int i = 0; + + while (true) { + map.put(i++, i++); + + if (i % 400000 == 0) + info("Inserted objects: " + i / 2); + } + } + + /** + * + */ + public void testConcurrentHashMapLoad() { + Map<Integer, Integer> map = new ConcurrentHashMap<>(5 * 1024 * 1024); + + int i = 0; + + while (true) { + map.put(i++, i++); + + if (i % 400000 == 0) + info("Inserted objects: " + i / 2); + } + } + + /** + * @throws Exception If failed. + */ + public void testMapEntry() throws Exception { + Map<Integer, GridCacheMapEntry<Integer, Integer>> map = new HashMap<>(5 * 1024 * 1024); + + int i = 0; + + GridCacheTestContext<Integer, Integer> ctx = new GridCacheTestContext<>( + new GridTestKernalContext(new GridTestLog4jLogger())); + + while (true) { + Integer key = i++; + Integer val = i++; + + map.put(key, new GridCacheMapEntry<Integer, Integer>(ctx, key, + key.hashCode(), val, null, 0, 1) { + @Override public boolean tmLock(IgniteTxEx<Integer, Integer> tx, long timeout) { + return false; + } + + @Override public void txUnlock(IgniteTxEx<Integer, Integer> tx) { + // No-op. + } + + @Override public boolean removeLock(GridCacheVersion ver) { + return false; + } + }); + + if (i % 100000 == 0) + info("Inserted objects: " + i / 2); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClient.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClient.java new file mode 100644 index 0000000..33d39b3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClient.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.job; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.loadtests.util.*; +import org.gridgain.testframework.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * + */ +public class GridJobExecutionLoadTestClient implements Callable<Object> { + /** Performance stats update interval in seconds. */ + private static final int UPDATE_INTERVAL_SEC = 10; + + /** Warm-up duration. */ + public static final int WARM_UP_DURATION = 60 * 1000; + + /** Grid. */ + private static Ignite g; + + /** Transaction count. */ + private static LongAdder txCnt = new LongAdder(); + + /** Finish flag. */ + private static volatile boolean finish; + + /** {@inheritDoc} */ + @SuppressWarnings("InfiniteLoopStatement") + @Nullable @Override public Object call() throws Exception { + IgniteCompute rmts = g.compute(g.cluster().forRemotes()); + + while (!finish) { + try { + rmts.execute(GridJobExecutionLoadTestTask.class, null); + + txCnt.increment(); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + } + + return null; + } + + /** + * @param args Args. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + GridFileLock fileLock = GridLoadTestUtils.fileLock(); + + fileLock.lock(); + + try { + final int noThreads = args.length > 0 ? Integer.parseInt(args[0]) : 64; + final int duration = args.length > 1 ? Integer.parseInt(args[1]) : 0; + final String outputFileName = args.length > 2 ? args[2] : null; + + X.println("Thread count: " + noThreads); + + g = G.start("modules/tests/config/jobs-load-client.xml"); + + warmUp(noThreads); + + final Thread collector = new Thread(new Runnable() { + @SuppressWarnings("BusyWait") + @Override public void run() { + GridCumulativeAverage avgTxPerSec = new GridCumulativeAverage(); + + try { + while (!finish) { + Thread.sleep(UPDATE_INTERVAL_SEC * 1000); + + long txPerSec = txCnt.sumThenReset() / UPDATE_INTERVAL_SEC; + + X.println(">>>"); + X.println(">>> Transactions/s: " + txPerSec); + + avgTxPerSec.update(txPerSec); + } + } + catch (InterruptedException ignored) { + X.println(">>> Interrupted."); + + Thread.currentThread().interrupt(); + } + + X.println(">>> Average Transactions/s: " + avgTxPerSec); + + if (outputFileName != null) { + try { + X.println("Writing results to file: " + outputFileName); + + GridLoadTestUtils.appendLineToFile( + outputFileName, + "%s,%d", + GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date()), + avgTxPerSec.get() + ); + } + catch (IOException e) { + X.error("Failed to output results to file.", e); + } + } + } + }); + + X.println("Running main test..."); + + Thread timer = null; + + try { + ExecutorService pool = Executors.newFixedThreadPool(noThreads); + + Collection<Callable<Object>> clients = new ArrayList<>(noThreads); + + for (int i = 0; i < noThreads; i++) + clients.add(new GridJobExecutionLoadTestClient()); + + collector.start(); + + if (duration > 0) { + timer = new Thread(new Runnable() { + @Override public void run() { + try { + Thread.sleep(duration * 1000); + + finish = true; + } + catch (InterruptedException ignored) { + X.println(">>> Interrupted."); + } + } + }); + timer.start(); + } + + pool.invokeAll(clients); + + collector.interrupt(); + + pool.shutdown(); + } + finally { + if (collector != null && !collector.isInterrupted()) + collector.interrupt(); + + if (timer != null) + timer.interrupt(); + + G.stopAll(true); + } + } + finally { + fileLock.close(); + } + } + + /** + * Warms the JVM up. + * + * @param noThreads Number of threads to use. + */ + private static void warmUp(int noThreads) { + X.println("Warming up..."); + + final IgniteCompute rmts = g.compute(g.cluster().forRemotes()); + + GridLoadTestUtils.runMultithreadedInLoop(new Callable<Object>() { + @Nullable @Override public Object call() { + try { + rmts.execute(GridJobExecutionLoadTestTask.class, null); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + + return null; + } + }, noThreads, WARM_UP_DURATION); + + // Run GC on all nodes. + try { + g.compute().run(new GridAbsClosure() { + @Override public void apply() { + System.gc(); + } + }); + } + catch (IgniteCheckedException e) { + throw new IllegalStateException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java new file mode 100644 index 0000000..3a202a8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.job; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.loadtests.util.*; +import org.gridgain.testframework.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * + */ +public class GridJobExecutionLoadTestClientSemaphore implements Callable<Object> { + /** Performance stats update interval in seconds. */ + private static final int UPDATE_INTERVAL_SEC = 10; + + /** Warm-up duration. */ + public static final int WARM_UP_DURATION = 60 * 1000; + + /** Grid. */ + private static Ignite g; + + /** Transaction count. */ + private static LongAdder txCnt = new LongAdder(); + + /** Finish flag. */ + private static volatile boolean finish; + + /** */ + private static Semaphore tasksSem; + + /** {@inheritDoc} */ + @SuppressWarnings("InfiniteLoopStatement") + @Nullable @Override public Object call() throws Exception { + final IgniteInClosure<IgniteFuture<?>> lsnr = new CI1<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> t) { + tasksSem.release(); + } + }; + + ClusterGroup rmts = g.cluster().forRemotes(); + + IgniteCompute comp = g.compute(rmts).enableAsync(); + + while (!finish) { + tasksSem.acquire(); + + comp.execute(GridJobExecutionLoadTestTask.class, null); + + ComputeTaskFuture<Object> f = comp.future(); + + f.listenAsync(lsnr); + + txCnt.increment(); + } + + return null; + } + + /** + * @param args Args. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + GridFileLock fileLock = GridLoadTestUtils.fileLock(); + + fileLock.lock(); + + try { + final int noThreads = args.length > 0 ? Integer.parseInt(args[0]) : + Runtime.getRuntime().availableProcessors(); + final int duration = args.length > 1 ? Integer.parseInt(args[1]) : 0; + int tasksCnt = args.length > 2 ? Integer.parseInt(args[2]) : 4069; + final String outputFileName = args.length > 3 ? args[3] : null; + + X.println("Thread count: " + noThreads); + X.println("Tasks count: " + tasksCnt); + + tasksSem = new Semaphore(tasksCnt); + + g = G.start("modules/tests/config/jobs-load-client.xml"); + + warmUp(noThreads); + + final Thread collector = new Thread(new Runnable() { + @SuppressWarnings("BusyWait") + @Override public void run() { + GridCumulativeAverage avgTxPerSec = new GridCumulativeAverage(); + + try { + while (!finish) { + Thread.sleep(UPDATE_INTERVAL_SEC * 1000); + + long txPerSec = txCnt.sumThenReset() / UPDATE_INTERVAL_SEC; + + X.println(">>>"); + X.println(">>> Transactions/s: " + txPerSec); + + avgTxPerSec.update(txPerSec); + } + } + catch (InterruptedException ignored) { + X.println(">>> Interrupted."); + + Thread.currentThread().interrupt(); + } + + X.println(">>> Average Transactions/s: " + avgTxPerSec); + + if (outputFileName != null) { + try { + X.println("Writing results to file: " + outputFileName); + + GridLoadTestUtils.appendLineToFile( + outputFileName, + "%s,%d", + GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date()), + avgTxPerSec.get() + ); + } + catch (IOException e) { + X.error("Failed to output results to file.", e); + } + } + } + }); + + X.println("Running main test..."); + + Thread timer = null; + + try { + ExecutorService pool = Executors.newFixedThreadPool(noThreads); + + Collection<Callable<Object>> clients = new ArrayList<>(noThreads); + + for (int i = 0; i < noThreads; i++) + clients.add(new GridJobExecutionLoadTestClientSemaphore()); + + collector.start(); + + if (duration > 0) { + timer = new Thread(new Runnable() { + @Override public void run() { + try { + Thread.sleep(duration * 1000); + + finish = true; + } + catch (InterruptedException ignored) { + X.println(">>> Interrupted."); + } + } + }); + timer.start(); + } + + pool.invokeAll(clients); + + collector.interrupt(); + + pool.shutdown(); + } + finally { + if (collector != null && !collector.isInterrupted()) + collector.interrupt(); + + if (timer != null) + timer.interrupt(); + + G.stopAll(true); + } + } + finally { + fileLock.close(); + } + } + + /** + * Warms the JVM up. + * + * @param noThreads Number of threads to use. + */ + private static void warmUp(int noThreads) { + X.println("Warming up..."); + + final IgniteCompute rmts = g.compute(g.cluster().forRemotes()); + + GridLoadTestUtils.runMultithreadedInLoop(new Callable<Object>() { + @Nullable @Override public Object call() { + try { + rmts.execute(GridJobExecutionLoadTestTask.class, null); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + + return null; + } + }, noThreads, WARM_UP_DURATION); + + // Run GC on all nodes. + try { + g.compute().run(new GridAbsClosure() { + @Override public void apply() { + System.gc(); + } + }); + } + catch (IgniteCheckedException e) { + throw new IllegalStateException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestJob.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestJob.java new file mode 100644 index 0000000..8788cab --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestJob.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.job; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; + +import java.io.*; + +/** + * + */ +public class GridJobExecutionLoadTestJob implements ComputeJob, Externalizable { + /** {@inheritDoc} */ + @Override public Object execute() throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestServer.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestServer.java new file mode 100644 index 0000000..4bcfd6f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestServer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.job; + +import org.apache.ignite.internal.util.typedef.*; + +/** + * + */ +public class GridJobExecutionLoadTestServer { + /** + * + */ + private GridJobExecutionLoadTestServer() { + // No-op. + } + + /** + * @param args Args. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + G.start("modules/core/src/test/config/jobs-load-server.xml"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestTask.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestTask.java new file mode 100644 index 0000000..2085696 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestTask.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.job; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * + */ +public class GridJobExecutionLoadTestTask implements ComputeTask<Object, Object> { + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Object arg) + throws IgniteCheckedException { + return F.asMap(new GridJobExecutionLoadTestJob(), subgrid.get(0)); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { + return REDUCE; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeLoadTest.java new file mode 100644 index 0000000..3dde4ea --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeLoadTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.job; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.loadtests.util.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * This test measures the performance of task execution engine by + * submitting empty tasks and collecting the average tasks/second + * statistics. + */ +public class GridJobExecutionSingleNodeLoadTest { + /** Stats update interval in seconds. */ + private static final int UPDATE_INTERVAL_SEC = 10; + + /** Warm-up duration. */ + public static final int WARM_UP_DURATION = 60 * 1000; + + /** + * @param args Command line arguments: + * 1-st: Number of worker threads. Default: 32. + * 2-nd: Test duration in seconds. 0 means infinite. Default: 0. + * 3-rd: File to output test results to. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + GridFileLock fileLock = GridLoadTestUtils.fileLock(); + + fileLock.lock(); + + try { + // Command line arguments. + int threadCnt = args.length == 0 ? 64 : Integer.parseInt(args[0]); + final int duration = args.length < 2 ? 0 : Integer.parseInt(args[1]); + final String outputFileName = args.length < 3 ? null : args[2]; + + final AtomicLong tasksCnt = new AtomicLong(); + + final AtomicBoolean finish = new AtomicBoolean(); + + ExecutorService pool = Executors.newFixedThreadPool(threadCnt); + + Collection<Callable<Object>> producers = new ArrayList<>(threadCnt); + + Thread collector = null; + + Thread timer = null; + + try { + final Ignite g = G.start("modules/core/src/test/config/grid-job-load.xml"); + + X.println("Warming up..."); + + GridLoadTestUtils.runMultithreadedInLoop(new Callable<Object>() { + @Override public Object call() throws IgniteCheckedException { + g.compute().execute(GridJobExecutionLoadTestTask.class, null); + + return null; + } + }, threadCnt, WARM_UP_DURATION); + + System.gc(); + + X.println("Running main test."); + + for (int i = 0; i < threadCnt; i++) + producers.add(new Callable<Object>() { + @SuppressWarnings({"unchecked", "InfiniteLoopStatement"}) + @Override public Object call() throws Exception { + while (!finish.get()) { + try { + g.compute().execute(GridJobExecutionLoadTestTask.class, null); + + tasksCnt.incrementAndGet(); + } + catch (ComputeTaskCancelledException ignored) { + // No-op. + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + } + + return null; + } + }); + + // Thread that measures and outputs performance statistics. + collector = new Thread(new Runnable() { + @SuppressWarnings({"BusyWait", "InfiniteLoopStatement"}) + @Override public void run() { + GridCumulativeAverage avgTasksPerSec = new GridCumulativeAverage(); + + try { + while (!finish.get()) { + long cnt0 = tasksCnt.get(); + + Thread.sleep(UPDATE_INTERVAL_SEC * 1000); + + long cnt1 = tasksCnt.get(); + + long curTasksPerSec = (cnt1 - cnt0) / UPDATE_INTERVAL_SEC; + X.println(">>> Tasks/s: " + curTasksPerSec); + + avgTasksPerSec.update(curTasksPerSec); + } + } + catch (InterruptedException ignored) { + X.println(">>> Interrupted."); + + Thread.currentThread().interrupt(); + } + + X.println(">>> Average tasks/s: " + avgTasksPerSec); + + if (outputFileName != null) { + X.println("Writing test results to a file: " + outputFileName); + + try { + GridLoadTestUtils.appendLineToFile( + outputFileName, + "%s,%d", + GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date()), + avgTasksPerSec.get()); + } + catch (IOException e) { + X.error("Failed to output to a file", e); + } + } + } + }); + collector.start(); + + if (duration > 0) { + // Thread that stops the test after a specified period of time. + timer = new Thread(new Runnable() { + @Override public void run() { + try { + Thread.sleep(duration * 1000); + + finish.set(true); + } + catch (InterruptedException ignored) { + // No-op. + } + } + }); + timer.start(); + } + + pool.invokeAll(producers); + + X.println("All done, stopping."); + + collector.interrupt(); + + pool.shutdown(); + } + finally { + if (collector != null && !collector.isInterrupted()) + collector.interrupt(); + + if (timer != null) + timer.interrupt(); + + G.stopAll(true); + } + } + finally { + fileLock.close(); + } + } + + /** + * Empty task (spawns one empty job). + */ + private static class GridJobExecutionLoadTestTask implements ComputeTask<Object, Object> { + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Object arg) + throws IgniteCheckedException { + return F.asMap(new GridJobExecutionLoadTestJob(), subgrid.get(0)); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { + return REDUCE; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } + + /** + * Empty job. + */ + private static class GridJobExecutionLoadTestJob implements ComputeJob { + /** {@inheritDoc} */ + @Override public Object execute() throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java new file mode 100644 index 0000000..3609a2b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.job; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.loadtests.util.*; +import org.gridgain.testframework.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * This test measures the performance of task execution engine by + * submitting empty tasks and collecting the average tasks/second + * statistics. + */ +public class GridJobExecutionSingleNodeSemaphoreLoadTest { + /** Stats update interval in seconds. */ + private static final int UPDATE_INTERVAL_SEC = 10; + + /** Warm-up duration. */ + public static final int WARM_UP_DURATION = 60 * 1000; + + /** + * @param args Command line arguments: + * 1-st: Number of worker threads. Default equals to available CPU number / 2. + * 2-nd: Concurrent tasks count. Default: 1024. + * 3-rd: Test duration in seconds. 0 means infinite. Default: 0. + * 4-th: File to output test results to. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + GridFileLock fileLock = GridLoadTestUtils.fileLock(); + + fileLock.lock(); + + try { + // Command line arguments. + // + // NOTE: on MacOS better numbers are shown if public pool core and max sizes are + // equal to CPU count. And producer threads count is equal to CPU count. + // + int threadCnt = args.length > 0 ? Integer.parseInt(args[0]) : + Runtime.getRuntime().availableProcessors() / 2; + int taskCnt = args.length > 1 ? Integer.parseInt(args[1]) : 1024; + final int duration = args.length > 2 ? Integer.parseInt(args[2]) : 0; + final String outputFileName = args.length > 3 ? args[3] : null; + + final LongAdder execCnt = new LongAdder(); + + try { + final Ignite g = G.start("modules/tests/config/grid-job-load.xml"); + + X.println("Thread count: " + threadCnt); + X.println("Task count: " + taskCnt); + X.println("Duration: " + duration); + + X.println("Warming up..."); + + g.compute().execute(GridJobExecutionLoadTestTask.class, null); + g.compute().execute(GridJobExecutionLoadTestTask.class, null); + + runTest(g, threadCnt, taskCnt, WARM_UP_DURATION, execCnt); + + System.gc(); + + execCnt.reset(); + + X.println("Running main test."); + + IgniteFuture<Void> collectorFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + GridCumulativeAverage avgTasksPerSec = new GridCumulativeAverage(); + + try { + while (!Thread.currentThread().isInterrupted()) { + U.sleep(UPDATE_INTERVAL_SEC * 1000); + + long curTasksPerSec = execCnt.sumThenReset() / UPDATE_INTERVAL_SEC; + + X.println(">>> Tasks/s: " + curTasksPerSec); + + avgTasksPerSec.update(curTasksPerSec); + } + } + catch (IgniteInterruptedException ignored) { + X.println(">>> Interrupted."); + + Thread.currentThread().interrupt(); + } + + X.println(">>> Average tasks/s: " + avgTasksPerSec); + + if (outputFileName != null) { + X.println("Writing test results to a file: " + outputFileName); + + try { + GridLoadTestUtils.appendLineToFile( + outputFileName, + "%s,%d", + GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date()), + avgTasksPerSec.get()); + } + catch (IOException e) { + X.error("Failed to output to a file", e); + } + } + + return null; + } + }); + + runTest(g, threadCnt, taskCnt, duration * 1000, execCnt); + + X.println("All done, stopping."); + + collectorFut.cancel(); + } + finally { + G.stopAll(true); + } + } + finally { + fileLock.close(); + } + } + + /** + * Runs the actual load test. + * + * @param g Grid. + * @param threadCnt Number of threads. + * @param taskCnt Number of tasks. + * @param dur Test duration. + * @param iterCntr Iteration counter. + */ + private static void runTest(final Ignite g, int threadCnt, int taskCnt, long dur, + final LongAdder iterCntr) { + final Semaphore sem = new Semaphore(taskCnt); + + final IgniteInClosure<IgniteFuture> lsnr = new CI1<IgniteFuture>() { + @Override public void apply(IgniteFuture t) { + sem.release(); + } + }; + + GridLoadTestUtils.runMultithreadedInLoop(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + sem.acquire(); + + IgniteCompute comp = g.compute().enableAsync(); + + comp.execute(GridJobExecutionLoadTestTask.class, null); + + ComputeTaskFuture<Object> f = comp.future(); + + f.listenAsync(lsnr); + + iterCntr.increment(); + + return null; + } + }, threadCnt, dur > 0 ? dur : Long.MAX_VALUE); + } + + /** + * Empty task (spawns one empty job). + */ + private static class GridJobExecutionLoadTestTask implements ComputeTask<Object, Object> { + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Object arg) + throws IgniteCheckedException { + return F.asMap(new GridJobExecutionLoadTestJob(), subgrid.get(0)); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { + return REDUCE; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } + + /** + * Empty job. + */ + private static class GridJobExecutionLoadTestJob implements ComputeJob { + /** {@inheritDoc} */ + @Override public Object execute() throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTest.java new file mode 100644 index 0000000..6953271 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.job; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.springframework.beans.factory.*; +import org.springframework.context.support.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Tests task/job behavior under load. + */ +public class GridJobLoadTest implements Runnable { + /** Test configuration directory. */ + private static final File TEST_CONF_DIR; + + /** + * + */ + static { + try { + TEST_CONF_DIR = new File(U.resolveGridGainUrl("/modules/core/src/test/config/job-loadtest").toURI()); + } + catch (URISyntaxException e) { + throw new RuntimeException("Failed to initialize directory.", e); + } + } + + /** File with test configuration. */ + private static final File TEST_CONFIGURATION_FILE = new File(TEST_CONF_DIR, "job-loadtest.properties"); + + /** File with client nodes configuration. */ + private static final File CLIENT_NODE_CONFIGURATION = new File(TEST_CONF_DIR, "client.xml"); + + /** File with server nodes configuration. */ + private static final File SERVER_NODE_CONFIGURATION = new File(TEST_CONF_DIR, "server.xml"); + + /** + * Main method. + * + * @param args Command-line parameters. + * @throws Exception if test start failed. + */ + public static void main(String[] args) throws Exception { + new GridJobLoadTest().run(); + } + + /** Number of client nodes to run. */ + private int clientNodes; + + /** Number of server nodes to run. */ + private int srvNodes; + + /** Number of submitter threads per client node. */ + private int threadsPerClient; + + /** Parameters for executing jobs. */ + private GridJobLoadTestParams taskParams; + + /** Submission/cancel ratio for submitting threads. */ + private int cancelRate; + + /** Time to sleep between task submissions. */ + private long submitDelay; + + /** Number of nodes running inside this test. */ + private int runningNodes; + + /** + * Generate new node number. + * + * @return a client number unique within this test run. + */ + private int getNextNodeNum() { + return ++runningNodes; + } + + /** + * Loads test configuration. + * + * @throws Exception if configuration is unawailable or broken. + */ + private void loadTestConfiguration() throws Exception { + assert TEST_CONFIGURATION_FILE.isFile(); + + InputStream in = null; + + Properties p = new Properties(); + + try { + in = new FileInputStream(TEST_CONFIGURATION_FILE); + + p.load(in); + } + finally { + U.closeQuiet(in); + } + + clientNodes = Integer.parseInt(p.getProperty("client.nodes.count")); + srvNodes = Integer.parseInt(p.getProperty("server.nodes.count")); + threadsPerClient = Integer.parseInt(p.getProperty("threads.per.client")); + cancelRate = Integer.parseInt(p.getProperty("cancel.rate")); + submitDelay = Long.parseLong(p.getProperty("submit.delay")); + + taskParams = new GridJobLoadTestParams( + Integer.parseInt(p.getProperty("jobs.count")), + Integer.parseInt(p.getProperty("jobs.test.duration")), + Integer.parseInt(p.getProperty("jobs.test.completion.delay")), + Double.parseDouble(p.getProperty("jobs.failure.probability")) + ); + } + + /** {@inheritDoc} */ + @Override public void run() { + List<Ignite> clientIgnites = runGrid(); + + assert clientIgnites.size() == clientNodes; + + int threadsCnt = clientNodes * threadsPerClient; + + Executor e = Executors.newFixedThreadPool(threadsCnt); + + for (Ignite ignite : clientIgnites) { + for (int j = 0; j < threadsPerClient; j++) + e.execute(new GridJobLoadTestSubmitter(ignite, taskParams, cancelRate, submitDelay)); + } + } + + /** + * Run all grid nodes as defined in test configuration. + * + * @return list of run nodes. + */ + private List<Ignite> runGrid() { + List<Ignite> clientIgnites = new ArrayList<>(clientNodes); + + try { + loadTestConfiguration(); + + for (int i = 0; i < srvNodes; i++) + startNode("server", SERVER_NODE_CONFIGURATION); + + // Start clients in the second order to cache a client node in GridGain. + for (int i = 0; i < clientNodes; i++) + clientIgnites.add(startNode("client", CLIENT_NODE_CONFIGURATION)); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + return clientIgnites; + } + + /** + * Starts new grid node. + * + * @param gridName name of new node. + * @param springCfg file with spring configuration to use for this node. + * @return a grid instance local to new node {@link org.apache.ignite.Ignition#start(org.apache.ignite.configuration.IgniteConfiguration)}. + * @throws Exception if node run failed. + */ + protected Ignite startNode(String gridName, File springCfg) throws Exception { + assert springCfg != null; + + ListableBeanFactory springCtx = new FileSystemXmlApplicationContext( + "file:///" + springCfg.getAbsolutePath()); + + Map cfgMap = springCtx.getBeansOfType(IgniteConfiguration.class); + + assert cfgMap != null; + assert !cfgMap.isEmpty(); + + IgniteConfiguration cfg = (IgniteConfiguration)cfgMap.values().iterator().next(); + + cfg.setGridName(gridName + "-" + getNextNodeNum()); + + return G.start(cfg); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestJob.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestJob.java new file mode 100644 index 0000000..3c8e8f8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestJob.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.job; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; + +import java.util.*; + +import static java.lang.Thread.*; + +/** + * Job for load test. + */ +public class GridJobLoadTestJob implements ComputeJob { + /** Length of the sequence emitted into session attributes. */ + private static final int EMIT_SEQUENCE_LENGTH = 10; + + /** Flag indicating whether this job should emit sequence into session attributes. */ + private final boolean emitAttrs; + + /** Probability of failure. */ + private final double failProbability; + + /** Duration between job start and random failure check. */ + private final long executionDuration; + + /** Duration between failure check and returning from {@link GridJobLoadTestJob#execute()}. */ + private final int completionDelay; + + /** Logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Job context. */ + @IgniteJobContextResource + private ComputeJobContext cntx; + + /** Task session. */ + @IgniteTaskSessionResource + private ComputeTaskSession taskSes; + + /** + * @param emitAttrs if {@code true} then this work should emit number sequence into session attribute + * @param failProbability Probability of failure. + * @param executionDuration Duration between job start and random failure check. + * @param completionDelay Duration between failure check and returning from + * {@link GridJobLoadTestJob#execute()}. + */ + public GridJobLoadTestJob(boolean emitAttrs, double failProbability, long executionDuration, + int completionDelay) { + this.emitAttrs = emitAttrs; + this.failProbability = failProbability; + this.executionDuration = executionDuration; + this.completionDelay = completionDelay; + } + + /**{@inheritDoc}*/ + @Override public void cancel() { + Thread.currentThread().interrupt(); + } + + /**{@inheritDoc}*/ + @Override public Integer execute() throws IgniteCheckedException { + try { + if (log.isInfoEnabled()) + log.info("Job started " + getJobInfo()); + + doJob(); + + if (new Random().nextDouble() <= failProbability) { + if (log.isInfoEnabled()) + log.info("Failing job " + getJobInfo()); + + throw new RuntimeException("Task failure simulation"); + } + + sleep(new Random().nextInt(completionDelay)); + + if (log.isInfoEnabled()) + log.info("Job is completing normally " + getJobInfo()); + } + catch (InterruptedException ignored) { + if (log.isDebugEnabled()) + log.debug("Job was cancelled " + getJobInfo()); + + // Let the method return normally. + } + + return 1; + } + + /** + * Performs job actions, depending on {@code emitAttributes} and {@code executionDuration} attribute values. + * + * @throws InterruptedException if task was cancelled during job execution. + */ + @SuppressWarnings("BusyWait") + private void doJob() throws InterruptedException { + if (emitAttrs) { + for (int i = 0; i < EMIT_SEQUENCE_LENGTH; i++) { + try { + taskSes.setAttribute(String.valueOf(i), i); + } + catch (IgniteCheckedException e) { + log.error("Set attribute failed.", e); + } + + sleep(executionDuration); + } + } + else { + sleep(executionDuration); + + Map<?, ?> attrs = taskSes.getAttributes(); + + boolean valMissed = false; + + for (int i = 0; i < EMIT_SEQUENCE_LENGTH; i++) { + Integer val = (Integer) attrs.get(String.valueOf(i)); + + // We shouldn't run in situation when some elements emitted before are missed and the current exists. + assert ! (valMissed && val != null) : + "Inconsistent session attribute set was received [missedAttribute=" + i + + ", jobId=" + cntx.getJobId() + ", attrs=" + attrs + ", nodeId=" + + ignite.configuration().getNodeId() + "]"; + + valMissed = (val == null); + } + } + } + + /** + * Gives job description in standard log format. + * + * @return String with current job representation. + */ + private String getJobInfo() { + return "[taskId=" + taskSes.getId() + ", jobId=" + cntx.getJobId() + ", nodeId=" + + ignite.configuration().getNodeId() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestParams.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestParams.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestParams.java new file mode 100644 index 0000000..293dd73 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestParams.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.job; + +/** + * Test task parameters. + */ +public class GridJobLoadTestParams { + /** Number of jobs to be spawned. */ + private final int jobsCnt; + + /** Duration between job start and failure check. */ + private final long executionDuration; + + /** Duration between failure check and job completion. */ + private final int completionDelay; + + /** Probability of simulated job failure. */ + private final double jobFailureProbability; + + /** + * @param jobsCnt Number of jobs to be spawned. + * @param executionDuration Duration between job start and failure check. + * @param completionDelay Duration between failure check and job completion. + * @param jobFailureProbability Probability of simulated job failure. + */ + public GridJobLoadTestParams(int jobsCnt, long executionDuration, int completionDelay, double jobFailureProbability) { + this.jobsCnt = jobsCnt; + this.executionDuration = executionDuration; + this.completionDelay = completionDelay; + this.jobFailureProbability = jobFailureProbability; + } + + /** + * Returns number of jobs to be spawned. + * + * @return Number of jobs to be spawned. + */ + public int getJobsCount() { + return jobsCnt; + } + + /** + * Returns duration between job start and failure check. + * + * @return Duration between job start and failure check. + */ + public long getExecutionDuration() { + return executionDuration; + } + + /** + * Returns duration between failure check and job completion. + * + * @return Duration between failure check and job completion. + */ + public int getCompletionDelay() { + return completionDelay; + } + + /** + * Returns probability of simulated job failure. + * + * @return Probability of simulated job failure. + */ + public double getJobFailureProbability() { + return jobFailureProbability; + } +}