http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java new file mode 100644 index 0000000..dd4c09e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.communication; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.spi.communication.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.common.*; +import org.jdk8.backport.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + +/** + * + */ +public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { + /** */ + public static final int CONCUR_MSGS = 10 * 1024; + + /** */ + private static final int THREADS = 2; + + /** */ + private static final long TEST_TIMEOUT = 3 * 60 * 1000; + + /** */ + private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(ipFinder); + + c.setDiscoverySpi(discoSpi); + + c.setCommunicationSpi(getCommunication()); + + return c; + } + + /** + * @param len Length. + * @return Test string. + */ + private static String generateTestString(int len) { + assert len > 0; + SB sb = new SB(); + + for (int i = 0; i < len; i++) + sb.a(Character.forDigit(i % 10, 10)); + + return sb.toString(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("deprecation") + public void testThroughput() throws Exception { + final GridKernal sndKernal = (GridKernal)grid(0); + final GridKernal rcvKernal = (GridKernal)grid(1); + + final ClusterNode sndNode = sndKernal.localNode(); + final ClusterNode rcvNode = rcvKernal.localNode(); + + final GridIoManager snd = sndKernal.context().io(); + final GridIoManager rcv = rcvKernal.context().io(); + + info("Senders: " + THREADS); + info("Messages: " + CONCUR_MSGS); + + final Semaphore sem = new Semaphore(CONCUR_MSGS); + final LongAdder msgCntr = new LongAdder(); + + final String topic = "test-topic"; + + rcv.addMessageListener( + topic, + new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + try { + rcv.send(sndNode, topic, (GridTcpCommunicationMessageAdapter)msg, PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + error("Failed to send message.", e); + } + } + }); + + snd.addMessageListener(topic, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + msgCntr.increment(); + + sem.release(); + } + }); + + Timer t = new Timer("results-reporter"); + + t.schedule(new TimerTask() { + private long ts = System.currentTimeMillis(); + + @Override public void run() { + long newTs = System.currentTimeMillis(); + long qrys = msgCntr.sumThenReset(); + + long time = newTs - ts; + + X.println("Communication benchmark [qps=" + qrys * 1000 / time + + ", executed=" + qrys + ", time=" + time + ']'); + + ts = newTs; + } + }, 10000, 10000); + + final AtomicBoolean finish = new AtomicBoolean(); + + IgniteFuture<?> f = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + IgniteUuid msgId = IgniteUuid.randomUuid(); + + while (!finish.get()) { + sem.acquire(); + + snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL); + } + } + catch (IgniteCheckedException e) { + X.println("Message send failed", e); + } + catch (InterruptedException ignored) { + // No-op. + } + + return null; + } + }, THREADS, "send-thread"); + + Thread.sleep(TEST_TIMEOUT); + + finish.set(true); + + sem.release(CONCUR_MSGS * 2); + + t.cancel(); + + f.get(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("deprecation") + public void testLatency() throws Exception { + final GridKernal sndKernal = (GridKernal)grid(0); + final GridKernal rcvKernal = (GridKernal)grid(1); + + final ClusterNode sndNode = sndKernal.localNode(); + final ClusterNode rcvNode = rcvKernal.localNode(); + + final GridIoManager snd = sndKernal.context().io(); + final GridIoManager rcv = rcvKernal.context().io(); + + final LongAdder msgCntr = new LongAdder(); + + final Integer topic = 1; + + final Map<IgniteUuid, CountDownLatch> map = new ConcurrentHashMap8<>(); + + rcv.addMessageListener( + topic, + new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + try { + rcv.send(sndNode, topic, (GridTcpCommunicationMessageAdapter)msg, PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + error("Failed to send message.", e); + } + } + }); + + snd.addMessageListener(topic, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + map.get(((GridTestMessage)msg).id()).countDown(); + } + }); + + Timer t = new Timer("results-reporter"); + + t.schedule(new TimerTask() { + private long ts = System.currentTimeMillis(); + + @Override public void run() { + long newTs = System.currentTimeMillis(); + long qrys = msgCntr.sumThenReset(); + + long time = newTs - ts; + + X.println("Communication benchmark [qps=" + qrys * 1000 / time + + ", executed=" + qrys + ", time=" + time + ']'); + + ts = newTs; + } + }, 10000, 10000); + + final AtomicBoolean finish = new AtomicBoolean(); + + IgniteFuture<?> f = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + IgniteUuid msgId = IgniteUuid.randomUuid(); + + while (!finish.get()) { + CountDownLatch latch = new CountDownLatch(1); + + map.put(msgId, latch); + + snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL); + + latch.await(); + + msgCntr.increment(); + } + } + catch (IgniteCheckedException e) { + X.println("Message send failed", e); + } + catch (InterruptedException ignored) { + // No-op. + } + + return null; + } + }, 1, "send-thread"); + + Thread.sleep(TEST_TIMEOUT); + + finish.set(true); + + t.cancel(); + + f.get(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("deprecation") + public void testVariableLoad() throws Exception { + final GridKernal sndKernal = (GridKernal)grid(0); + final GridKernal rcvKernal = (GridKernal)grid(1); + + final ClusterNode sndNode = sndKernal.localNode(); + final ClusterNode rcvNode = rcvKernal.localNode(); + + final GridIoManager snd = sndKernal.context().io(); + final GridIoManager rcv = rcvKernal.context().io(); + + info("Senders: " + THREADS); + info("Messages: " + CONCUR_MSGS); + + final Semaphore sem = new Semaphore(CONCUR_MSGS); + final LongAdder msgCntr = new LongAdder(); + + final String topic = "test-topic"; + + final Map<IgniteUuid, CountDownLatch> latches = new ConcurrentHashMap8<>(); + + rcv.addMessageListener( + topic, + new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + try { + rcv.send(sndNode, topic, (GridTcpCommunicationMessageAdapter)msg, PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + error("Failed to send message.", e); + } + } + }); + + snd.addMessageListener(topic, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + msgCntr.increment(); + + sem.release(); + + CountDownLatch latch = latches.get(((GridTestMessage)msg).id()); + + if (latch != null) + latch.countDown(); + } + }); + + final AtomicBoolean finish = new AtomicBoolean(); + final AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(); + + IgniteFuture<?> f = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!finish.get()) { + CountDownLatch latch = latchRef.get(); + + if (latch != null) + U.await(latch); + + IgniteUuid msgId = IgniteUuid.randomUuid(); + + sem.acquire(); + + snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL); + } + + return null; + } + }, THREADS, "send-thread"); + + IgniteFuture<?> f1 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + private long ts = System.currentTimeMillis(); + + @Override public Object call() throws Exception { + try { + while (!finish.get()) { + info(U.nl() + ">>>" + U.nl() + ">>> High load." + U.nl() + ">>>"); + + U.sleep(15 * 1000); + + reportNumbers(); + + info(U.nl() + ">>>" + U.nl() + ">>> Low load." + U.nl() + ">>>"); + + CountDownLatch latch = new CountDownLatch(1); + + try { + // Here will be a pause. + latchRef.set(latch); + + U.sleep(7 * 1000); + + reportNumbers(); + } + finally { + latch.countDown(); + } + } + } + catch (IgniteCheckedException e) { + X.println("Message send failed", e); + } + + return null; + } + + /** + * + */ + void reportNumbers() { + long newTs = System.currentTimeMillis(); + long qrys = msgCntr.sumThenReset(); + + long time = newTs - ts; + + X.println("Communication benchmark [qps=" + qrys * 1000 / time + + ", executed=" + qrys + ", time=" + time + ']'); + + ts = newTs; + + } + }, 1, "load-dispatcher"); + + IgniteFuture<?> f2 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!finish.get()) { + U.sleep(1000); + + IgniteUuid msgId = IgniteUuid.randomUuid(); + CountDownLatch latch = new CountDownLatch(1); + + latches.put(msgId, latch); + + snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL); + + long start = System.currentTimeMillis(); + + latch.await(); + + info("Response time: " + (System.currentTimeMillis() - start)); + } + + return null; + } + }, THREADS, "low-loader"); + + Thread.sleep(TEST_TIMEOUT); + + finish.set(true); + + sem.release(CONCUR_MSGS * 2); + + f.get(); + f1.get(); + f2.get(); + } + + /** + * @return SPI instance. + */ + private CommunicationSpi getCommunication() { + TcpCommunicationSpi spi = new TcpCommunicationSpi(); + + spi.setTcpNoDelay(true); + spi.setSharedMemoryPort(-1); + spi.setConnectionBufferSize(0); + + info("Comm SPI: " + spi); + + return spi; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT + 60 * 1000; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTcpCommunicationBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTcpCommunicationBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTcpCommunicationBenchmark.java new file mode 100644 index 0000000..8c06610 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTcpCommunicationBenchmark.java @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.communication; +// +//import org.gridgain.grid.*; +//import org.gridgain.grid.lang.*; +//import org.gridgain.grid.lang.utils.*; +//import org.gridgain.grid.spi.*; +//import org.gridgain.grid.spi.communication.*; +//import org.gridgain.grid.spi.communication.tcp.*; +//import org.gridgain.grid.typedef.*; +//import org.gridgain.grid.typedef.internal.*; +//import org.gridgain.testframework.*; +//import org.gridgain.testframework.junits.*; +//import org.gridgain.testframework.junits.spi.*; +// +//import java.util.*; +//import java.util.concurrent.*; +//import java.util.concurrent.atomic.*; +// +///** */ +//@GridSpiTest(spi = GridTcpCommunicationSpi.class, group = "TCP communication SPI benchmark.") +//public class GridTcpCommunicationBenchmark extends GridSpiAbstractTest<GridTcpCommunicationSpi> { +// /** */ +// public static final int CONCUR_MSGS = 10 * 1024; +// +// /** */ +// private static final int THREADS = 1; +// +// /** */ +// private static final long TEST_TIMEOUT = 3 * 60 * 1000; +// +// /** */ +// private final Collection<GridTestResources> spiRsrcs = new ArrayList<>(); +// +// /** */ +// private final Map<UUID, GridCommunicationSpi> spis = new HashMap<>(); +// +// /** */ +// private final Collection<GridNode> nodes = new ArrayList<>(); +// +// /** +// * Disable automatic test SPI start. +// */ +// public GridTcpCommunicationBenchmark() { +// super(false); +// } +// +// /** {@inheritDoc} */ +// @Override protected void beforeTest() throws Exception { +// Map<GridNode, GridSpiTestContext> ctxs = new HashMap<>(); +// +// for (int i = 0; i < 2; i++) { +// GridCommunicationSpi spi = getCommunication(); +// +// GridTestResources rsrcs = new GridTestResources(); +// +// GridTestNode node = new GridTestNode(rsrcs.getNodeId()); +// +// GridSpiTestContext ctx = initSpiContext(); +// +// ctx.setLocalNode(node); +// +// spiRsrcs.add(rsrcs); +// +// rsrcs.inject(spi); +// +// node.setAttributes(spi.getNodeAttributes()); +// +// nodes.add(node); +// +// spi.spiStart(getTestGridName() + (i + 1)); +// +// spis.put(rsrcs.getNodeId(), spi); +// +// spi.onContextInitialized(ctx); +// +// ctxs.put(node, ctx); +// } +// +// // For each context set remote nodes. +// for (Map.Entry<GridNode, GridSpiTestContext> e : ctxs.entrySet()) { +// for (GridNode n : nodes) { +// if (!n.equals(e.getKey())) +// e.getValue().remoteNodes().add(n); +// } +// } +// } +// +// /** {@inheritDoc} */ +// @Override protected void afterTest() throws Exception { +// for (GridCommunicationSpi spi : spis.values()) { +// spi.setListener(null); +// +// spi.spiStop(); +// } +// +// for (GridTestResources rsrcs : spiRsrcs) +// rsrcs.stopThreads(); +// } +// +// /** +// * @param len Length. +// * @return Test string. +// */ +// private static String generateTestString(int len) { +// assert len > 0; +// SB sb = new SB(); +// +// for (int i = 0; i < len; i++) +// sb.a(Character.forDigit(i % 10, 10)); +// +// return sb.toString(); +// } +// +// /** +// * @throws Exception If failed. +// */ +// @SuppressWarnings("deprecation") +// public void testThroughput() throws Exception { +// assert spis.size() == 2; +// assert nodes.size() == 2; +// +// Iterator<GridNode> it = nodes.iterator(); +// +// final GridNode sndNode = it.next(); +// final GridNode rcvNode = it.next(); +// +// final GridCommunicationSpi sndComm = spis.get(sndNode.id()); +// final GridCommunicationSpi rcvComm = spis.get(rcvNode.id()); +// +// final String testStr = generateTestString(66); +// +// info("Test string length: " + testStr.length()); +// info("Senders: " + THREADS); +// info("Messages: " + CONCUR_MSGS); +// +// final Semaphore sem = new Semaphore(CONCUR_MSGS); +// final LongAdder msgCntr = new LongAdder(); +// +// rcvComm.setListener(new GridCommunicationListener() { +// @Override public void onMessage(UUID nodeId, byte[] msg, GridAbsClosure msgC) { +// try { +// byte[] res = U.join(U.intToBytes(msg.length), msg); +// +// rcvComm.sendMessage(sndNode, res, 0, res.length); +// } +// catch (GridSpiException e) { +// log.error("Message echo failed.", e); +// } +// finally { +// msgC.apply(); +// } +// } +// }); +// +// sndComm.setListener(new GridCommunicationListener() { +// @Override public void onMessage(UUID nodeId, byte[] msg, GridAbsClosure msgC) { +// msgCntr.increment(); +// +// sem.release(); +// +// msgC.apply(); +// } +// }); +// +// Timer t = new Timer("results-reporter"); +// +// t.schedule(new TimerTask() { +// private long ts = System.currentTimeMillis(); +// +// @Override public void run() { +// long newTs = System.currentTimeMillis(); +// long qrys = msgCntr.sumThenReset(); +// +// long time = newTs - ts; +// +// X.println("Communication benchmark [qps=" + qrys * 1000 / time + +// ", executed=" + qrys + ", time=" + time + ']'); +// +// ts = newTs; +// } +// }, 10000, 10000); +// +// final AtomicBoolean finish = new AtomicBoolean(); +// +// GridFuture<?> f = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { +// @Override public Object call() throws Exception { +// try { +// while (!finish.get()) { +// GridUuid msgId = GridUuid.randomUuid(); +// +// sem.acquire(); +// +// // Loading message with additional data, to get results, +// // comparable with IoManager benchmark. +// GridTuple2<byte[], Integer> t = getTestResources().getMarshaller(). +// marshalNoCopy(new GridTestMessage(msgId, testStr), 4); +// +// byte[] buf = t.get1(); +// int len = t.get2(); +// +// U.intToBytes(len - 4, buf, 0); +// +// sndComm.sendMessage(rcvNode, buf, 0, len); +// } +// } +// catch (IgniteCheckedException e) { +// X.println("Message send failed", e); +// } +// catch (InterruptedException ignored) { +// // No-op. +// } +// +// return null; +// } +// }, THREADS, "send-thread"); +// +// Thread.sleep(TEST_TIMEOUT); +// +// finish.set(true); +// +// sem.release(CONCUR_MSGS * 2); +// +// t.cancel(); +// +// f.get(); +// } +// +// /** +// * @throws Exception If failed. +// */ +// @SuppressWarnings("deprecation") +// public void testLatency() throws Exception { +// assert spis.size() == 2; +// assert nodes.size() == 2; +// +// Iterator<GridNode> it = nodes.iterator(); +// +// final GridNode sndNode = it.next(); +// final GridNode rcvNode = it.next(); +// +// final GridCommunicationSpi sndComm = spis.get(sndNode.id()); +// final GridCommunicationSpi rcvComm = spis.get(rcvNode.id()); +// +// final String testStr = generateTestString(66); +// +// info("Test string length: " + testStr.length()); +// +// final LongAdder msgCntr = new LongAdder(); +// +// final Map<GridUuid, CountDownLatch> map = new ConcurrentHashMap8<>(); +// +// rcvComm.setListener(new GridCommunicationListener() { +// @Override public void onMessage(UUID nodeId, byte[] msg, GridAbsClosure msgC) { +// try { +// byte[] res = U.join(U.intToBytes(msg.length), msg); +// +// rcvComm.sendMessage(sndNode, res, 0, res.length); +// } +// catch (GridSpiException e) { +// log.error("Message echo failed.", e); +// } +// finally { +// msgC.apply(); +// } +// } +// }); +// +// final ClassLoader clsLdr = getClass().getClassLoader(); +// +// sndComm.setListener(new GridCommunicationListener() { +// @Override public void onMessage(UUID nodeId, byte[] msg, GridAbsClosure msgC) { +// try { +// GridTestMessage testMsg = getTestResources().getMarshaller().unmarshal(msg, clsLdr); +// +// map.get(testMsg.id()).countDown(); +// } +// catch (IgniteCheckedException e) { +// U.error(log, "Failed to ", e); +// } +// finally { +// msgC.apply(); +// } +// } +// }); +// +// Timer t = new Timer("results-reporter"); +// +// t.schedule(new TimerTask() { +// private long ts = System.currentTimeMillis(); +// +// @Override public void run() { +// long newTs = System.currentTimeMillis(); +// long qrys = msgCntr.sumThenReset(); +// +// long time = newTs - ts; +// +// X.println("Communication benchmark [qps=" + qrys * 1000 / time + +// ", executed=" + qrys + ", time=" + time + ']'); +// +// ts = newTs; +// } +// }, 10000, 10000); +// +// final AtomicBoolean finish = new AtomicBoolean(); +// +// GridFuture<?> f = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { +// @Override public Object call() throws Exception { +// info("Test thread started."); +// +// try { +// GridUuid msgId = GridUuid.randomUuid(); +// +// GridTuple2<byte[], Integer> t = getTestResources().getMarshaller(). +// marshalNoCopy(new GridTestMessage(msgId, testStr), 4); +// +// byte[] buf = t.get1(); +// int len = t.get2(); +// +// U.intToBytes(len - 4, buf, 0); +// +// while (!finish.get()) { +// // Loading message with additional data, to get results, +// // comparable with IoManager benchmark. +// CountDownLatch latch = new CountDownLatch(1); +// +// map.put(msgId, latch); +// +// sndComm.sendMessage(rcvNode, buf, 0, len); +// +// latch.await(); +// +// msgCntr.increment(); +// } +// } +// catch (IgniteCheckedException e) { +// X.println("Message send failed", e); +// } +// catch (InterruptedException ignored) { +// // No-op. +// } +// +// return null; +// } +// }, 2, "send-thread"); +// +// Thread.sleep(TEST_TIMEOUT); +// +// finish.set(true); +// +// t.cancel(); +// +// f.get(); +// } +// +// /** +// * @throws Exception If failed. +// */ +// @SuppressWarnings("deprecation") +// public void testVariableLoad() throws Exception { +// assert spis.size() == 2; +// assert nodes.size() == 2; +// +// Iterator<GridNode> it = nodes.iterator(); +// +// final GridNode sndNode = it.next(); +// final GridNode rcvNode = it.next(); +// +// final GridCommunicationSpi sndComm = spis.get(sndNode.id()); +// final GridCommunicationSpi rcvComm = spis.get(rcvNode.id()); +// +// final String testStr = generateTestString(16); +// +// info("Test string length: " + testStr.length()); +// info("Senders: " + THREADS); +// info("Messages: " + CONCUR_MSGS); +// +// final Semaphore sem = new Semaphore(CONCUR_MSGS); +// final LongAdder msgCntr = new LongAdder(); +// +// final Map<GridUuid, CountDownLatch> latches = new ConcurrentHashMap8<>(); +// +// rcvComm.setListener(new GridCommunicationListener() { +// @Override public void onMessage(UUID nodeId, byte[] msg, GridAbsClosure msgC) { +// try { +// byte[] res = U.join(U.intToBytes(msg.length), msg); +// +// rcvComm.sendMessage(sndNode, res, 0, res.length); +// } +// catch (GridSpiException e) { +// log.error("Message echo failed.", e); +// } +// finally { +// msgC.apply(); +// } +// } +// }); +// +// sndComm.setListener(new GridCommunicationListener() { +// @Override public void onMessage(UUID nodeId, byte[] buf, GridAbsClosure msgC) { +// msgCntr.increment(); +// +// sem.release(); +// +// GridTestMessage msg = null; +// +// try { +// msg = getTestResources().getMarshaller().unmarshal(buf, U.gridClassLoader()); +// } +// catch (IgniteCheckedException e) { +// U.error(log, "Failed to unmarshal message.", e); +// +// fail(); +// } +// finally { +// msgC.apply(); +// } +// +// CountDownLatch latch = latches.get(msg.id()); +// +// if (latch != null) +// latch.countDown(); +// } +// }); +// +// final AtomicBoolean finish = new AtomicBoolean(); +// final AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(); +// +// GridFuture<?> f = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { +// @Override public Object call() throws Exception { +// while (!finish.get()) { +// CountDownLatch latch = latchRef.get(); +// +// if (latch != null) +// U.await(latch); +// +// GridUuid msgId = GridUuid.randomUuid(); +// +// sem.acquire(); +// +// // Loading message with additional data, to get results, +// // comparable with IoManager benchmark. +// GridTuple2<byte[], Integer> t = getTestResources().getMarshaller(). +// marshalNoCopy(new GridTestMessage(msgId, testStr), 4); +// +// byte[] buf = t.get1(); +// int len = t.get2(); +// +// U.intToBytes(len - 4, buf, 0); +// +// sndComm.sendMessage(rcvNode, buf, 0, len); +// } +// +// return null; +// } +// }, THREADS, "send-thread"); +// +// GridFuture<?> f1 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { +// private long ts = System.currentTimeMillis(); +// +// @Override public Object call() throws Exception { +// try { +// while (!finish.get()) { +// info(U.nl() + ">>>" + U.nl() + ">>> High load." + U.nl() + ">>>"); +// +// U.sleep(15 * 1000); +// +// reportNumbers(); +// +// info(U.nl() + ">>>" + U.nl() + ">>> Low load." + U.nl() + ">>>"); +// +// CountDownLatch latch = new CountDownLatch(1); +// +// try { +// // Here will be a pause. +// latchRef.set(latch); +// +// U.sleep(7 * 1000); +// +// reportNumbers(); +// } +// finally { +// latch.countDown(); +// } +// } +// } +// catch (IgniteCheckedException e) { +// X.println("Message send failed", e); +// } +// +// return null; +// } +// +// /** +// * +// */ +// void reportNumbers() { +// long newTs = System.currentTimeMillis(); +// long qrys = msgCntr.sumThenReset(); +// +// long time = newTs - ts; +// +// X.println("Communication benchmark [qps=" + qrys * 1000 / time + +// ", executed=" + qrys + ", time=" + time + ']'); +// +// ts = newTs; +// +// } +// }, 1, "load-dispatcher"); +// +// GridFuture<?> f2 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { +// @Override public Object call() throws Exception { +// while (!finish.get()) { +// U.sleep(1000); +// +// GridUuid msgId = GridUuid.randomUuid(); +// CountDownLatch latch = new CountDownLatch(1); +// +// latches.put(msgId, latch); +// +// GridTuple2<byte[], Integer> t = getTestResources().getMarshaller(). +// marshalNoCopy(new GridTestMessage(msgId, testStr), 4); +// +// byte[] buf = t.get1(); +// int len = t.get2(); +// +// U.intToBytes(len - 4, buf, 0); +// +// sndComm.sendMessage(rcvNode, buf, 0, len); +// +// long start = System.currentTimeMillis(); +// +// latch.await(); +// +// info("Response time: " + (System.currentTimeMillis() - start)); +// } +// +// return null; +// } +// }, THREADS, "low-loader"); +// +// Thread.sleep(TEST_TIMEOUT); +// +// finish.set(true); +// +// sem.release(CONCUR_MSGS * 2); +// +// f.get(); +// f1.get(); +// f2.get(); +// } +// +// /** +// * @return SPI instance. +// */ +// private GridCommunicationSpi getCommunication() { +// GridTcpCommunicationSpi spi = new GridTcpCommunicationSpi(); +// +// spi.setSharedMemoryPort(-1); +// spi.setTcpNoDelay(true); +// spi.setLocalAddress("127.0.0.1"); +// +// return spi; +// } +// +// /** {@inheritDoc} */ +// @Override protected long getTestTimeout() { +// return TEST_TIMEOUT + 60 * 1000; +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java new file mode 100644 index 0000000..e2c7a6a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.communication; + +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.nio.*; + +/** + * + */ +class GridTestMessage extends GridTcpCommunicationMessageAdapter implements Externalizable { + /** */ + private IgniteUuid id; + + /** */ + private long field1; + + /** */ + private long field2; + + /** */ + private String str; + + /** */ + private byte[] bytes; + + /** + * @param id Message ID. + * @param str String. + */ + GridTestMessage(IgniteUuid id, String str) { + this.id = id; + this.str = str; + } + + /** + * @param id Message ID. + * @param bytes Bytes. + */ + GridTestMessage(IgniteUuid id, byte[] bytes) { + this.id = id; + this.bytes = bytes; + } + + /** + * For Externalizable support. + */ + public GridTestMessage() { + // No-op. + } + + /** + * @return Message ID. + */ + public IgniteUuid id() { + return id; + } + + /** + * @return Bytes. + */ + public byte[] bytes() { + return bytes; + } + + /** + * @param bytes Bytes. + */ + public void bytes(byte[] bytes) { + this.bytes = bytes; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, id); + out.writeLong(field1); + out.writeLong(field2); + U.writeString(out, str); + U.writeByteArray(out, bytes); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = U.readGridUuid(in); + field1 = in.readLong(); + field2 = in.readLong(); + str = U.readString(in); + bytes = U.readByteArray(in); + } + + /** {@inheritDoc} */ + @SuppressWarnings("CloneDoesntCallSuperClone") + @Override public GridTcpCommunicationMessageAdapter clone() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf) { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf) { + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java new file mode 100644 index 0000000..d6d4f12 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.cache.query.continuous.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.loadtests.util.GridLoadTestArgs.*; +import static org.gridgain.testframework.GridLoadTestUtils.*; +import static org.gridgain.testframework.GridTestUtils.*; + +/** + * Load test for {@link org.apache.ignite.cache.query.GridCacheContinuousQuery}. + */ +public class GridContinuousOperationsLoadTest { + /** + * Main method. + * + * @param args Command line arguments. + * @throws Exception If error occurs. + */ + public static void main(String[] args) throws Exception { + final String cfgPath = args.length > 0 ? args[0] : "examples/config/example-cache.xml"; + final String cacheName = getStringProperty(CACHE_NAME, "partitioned"); + final Integer valSize = getIntProperty(VALUE_SIZE, 1024); + final Integer threadsCnt = getIntProperty(THREADS_CNT, 8); + final Integer testDurSec = getIntProperty(TEST_DUR_SEC, 180); + + final Integer filterSkipProb = getIntProperty("GG_FILTER_SKIP_PROBABILITY", 10, new C1<Integer, String>() { + @Nullable @Override public String apply(Integer val) { + if (val < 0 || val > 100) + return "The value should be between 1 and 100."; + + return null; + } + }); + + final boolean useQry = getBooleanProperty("GG_USE_QUERIES", true); + final int bufSize = getIntProperty("GG_BUFFER_SIZE", 1); + final long timeInterval = getLongProperty("GG_TIME_INTERVAL", 0); + final int parallelCnt = getIntProperty("GG_PARALLEL_COUNT", 8); + final int keyRange = getIntProperty("GG_KEY_RANGE", 100000); + final long updSleepMs = getLongProperty("GG_UPDATE_SLEEP_MS", 0); + final long filterSleepMs = getLongProperty("GG_FILTER_SLEEP_MS", 0); + final long cbSleepMs = getLongProperty("GG_CALLBACK_SLEEP_MS", 0); + + X.println("The test will start with the following parameters:"); + + dumpProperties(System.out); + + try (Ignite ignite = Ignition.start(cfgPath)) { + final GridCache<Object, Object> cache = ignite.cache(cacheName); + + if (cache == null) + throw new IgniteCheckedException("Cache is not configured: " + cacheName); + + // Continuous query manager, used to monitor queue size. + final GridCacheContinuousQueryManager contQryMgr = + ((GridCacheAdapter)((GridCacheProxyImpl)cache).cache()).context().continuousQueries(); + + if (contQryMgr == null) + throw new IgniteCheckedException("Could not access GridCacheContinuousQueryManager"); + + final AtomicBoolean stop = new AtomicBoolean(); // Stop flag. + final AtomicLong cbCntr = new AtomicLong(); // Callback counter. + final AtomicLong updCntr = new AtomicLong(); // Update counter. + + for (int i = 0; i < parallelCnt; i++) { + if (useQry) { + GridCacheContinuousQuery<Object, Object> qry = cache.queries().createContinuousQuery(); + + qry.callback(new PX2<UUID, Collection<Map.Entry<Object, Object>>>() { + @Override public boolean applyx(UUID uuid, Collection<Map.Entry<Object, Object>> entries) + throws IgniteInterruptedException { + if (cbSleepMs > 0) + U.sleep(cbSleepMs); + + cbCntr.addAndGet(entries.size()); + + return true; // Continue listening. + } + }); + + qry.filter(new PX2<Object, Object>() { + @Override public boolean applyx(Object key, Object val) throws IgniteInterruptedException { + if (filterSleepMs > 0) + U.sleep(filterSleepMs); + + return Math.random() * 100 >= filterSkipProb; + } + }); + + qry.bufferSize(bufSize); + qry.timeInterval(timeInterval); + + qry.execute(); + } + else { + ignite.events().remoteListen( + bufSize, + timeInterval, + true, + new PX2<UUID, IgniteEvent>() { + @Override + public boolean applyx(UUID uuid, IgniteEvent evt) + throws IgniteInterruptedException { + if (cbSleepMs > 0) + U.sleep(cbSleepMs); + + cbCntr.incrementAndGet(); + + return true; // Continue listening. + } + }, + new PX1<IgniteEvent>() { + @Override + public boolean applyx(IgniteEvent evt) throws IgniteInterruptedException { + if (filterSleepMs > 0) + U.sleep(filterSleepMs); + + return Math.random() * 100 >= filterSkipProb; + } + }, + EVT_CACHE_OBJECT_PUT + ); + } + } + + // Start collector thread. + startDaemon(new Runnable() { + @Override public void run() { + try { + while (!stop.get() && !Thread.currentThread().isInterrupted()) { + long cbCntr0 = cbCntr.get(); + long updCntr0 = updCntr.get(); + + U.sleep(1000); + + long cbDelta = cbCntr.get() - cbCntr0; + long updDelta = updCntr.get() - updCntr0; + + X.println("Stats [entriesPerSec=" + cbDelta + + ", updatesPerSec=" + updDelta + ']'); + } + } + catch (IgniteInterruptedException ignored) { + // No-op. + } + } + }); + + X.println("Starting " + threadsCnt + " generator thread(s)."); + + // Start generator threads. + IgniteFuture<Long> genFut = runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + byte[] val = new byte[valSize]; + ThreadLocalRandom8 rnd = ThreadLocalRandom8.current(); + + while (!stop.get() && !Thread.currentThread().isInterrupted()) { + Integer key = rnd.nextInt(keyRange); + + cache.putx(key, val); + + updCntr.incrementAndGet(); + + if (updSleepMs > 0) + U.sleep(updSleepMs); + } + + return true; + } + }, threadsCnt, "load-test-generator"); + + U.sleep(testDurSec * 1000); + + stop.set(true); + + genFut.get(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java new file mode 100644 index 0000000..9ccab68 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.datastructures; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.datastructures.*; +import org.apache.ignite.cache.eviction.lru.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Load test for atomic long. + */ +public class GridCachePartitionedAtomicLongLoadTest extends GridCommonAbstractTest { + /** Test duration. */ + private static final long DURATION = 8 * 60 * 60 * 1000; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final AtomicInteger idx = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.getTransactionsConfiguration().setDefaultTxConcurrency(PESSIMISTIC); + c.getTransactionsConfiguration().setDefaultTxIsolation(REPEATABLE_READ); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(GridCacheMode.PARTITIONED); + cc.setStartSize(200); + cc.setPreloadMode(GridCachePreloadMode.SYNC); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setEvictionPolicy(new GridCacheLruEvictionPolicy<>(1000)); + cc.setBackups(1); + cc.setAffinity(new GridCacheConsistentHashAffinityFunction(true)); + cc.setAtomicSequenceReserveSize(10); + cc.setEvictSynchronized(true); + cc.setEvictNearSynchronized(true); + + c.setCacheConfiguration(cc); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + return c; + } + + /** + * @throws Exception If failed. + */ + public void testLoad() throws Exception { + startGrid(); + + try { + multithreaded(new AtomicCallable(), 50); + } + finally { + stopGrid(); + } + } + + /** + * + */ + private class AtomicCallable implements Callable<Boolean> { + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + Ignite ignite = grid(); + + GridCache cache = ignite.cache(null); + + assert cache != null; + + GridCacheAtomicSequence seq = cache.dataStructures().atomicSequence("SEQUENCE", 0, true); + + long start = System.currentTimeMillis(); + + while (System.currentTimeMillis() - start < DURATION && !Thread.currentThread().isInterrupted()) { + IgniteTx tx = cache.txStart(); + + long seqVal = seq.incrementAndGet(); + + int curIdx = idx.incrementAndGet(); + + if (curIdx % 1000 == 0) + info("Sequence value [seq=" + seqVal + ", idx=" + curIdx + ']'); + + tx.commit(); + } + + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridLoadTestJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridLoadTestJob.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridLoadTestJob.java new file mode 100644 index 0000000..668ea80 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridLoadTestJob.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.multisplit; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; + +import java.io.*; + +/** + * Load test job. + */ +public class GridLoadTestJob extends ComputeJobAdapter { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * Constructor. + * @param arg Argument. + */ + public GridLoadTestJob(Integer arg) { + super(arg); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Serializable execute() throws IgniteCheckedException { + Integer i = this.<Integer>argument(0); + + assert i != null && i > 0; + + if (i == 1) + return new GridLoadTestJobTarget().executeLoadTestJob(1); + + assert ignite != null; + + ignite.compute().localDeployTask(GridLoadTestTask.class, GridLoadTestTask.class.getClassLoader()); + + return (Integer) ignite.compute().execute(GridLoadTestTask.class.getName(), i); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridLoadTestJobTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridLoadTestJobTarget.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridLoadTestJobTarget.java new file mode 100644 index 0000000..9c9a65b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridLoadTestJobTarget.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.multisplit; + +/** + * Load test job target. + */ +public class GridLoadTestJobTarget { + /** + * @param level Level. + * @return Always return {@code 1}. + */ + public int executeLoadTestJob(int level) { + assert level > 0; + + return 1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridLoadTestTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridLoadTestTask.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridLoadTestTask.java new file mode 100644 index 0000000..4a5052a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridLoadTestTask.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.multisplit; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; + +import java.util.*; + +/** + * Load test task. + */ +public class GridLoadTestTask extends ComputeTaskAdapter<Integer, Integer> { + /** Injected job context. */ + @IgniteTaskSessionResource + private ComputeTaskSession ctx; + + /** */ + @SuppressWarnings("unused") + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Integer arg) throws IgniteCheckedException { + assert arg != null; + assert arg > 1; + + Map<GridLoadTestJob, ClusterNode> map = new HashMap<>(subgrid.size()); + + Iterator<ClusterNode> iter = subgrid.iterator(); + + Collection<UUID> assigned = new ArrayList<>(subgrid.size()); + + for (int i = 0; i < arg; i++) { + // Recycle iterator. + if (!iter.hasNext()) + iter = subgrid.iterator(); + + ClusterNode node = iter.next(); + + assigned.add(node.id()); + + map.put(new GridLoadTestJob(arg - 1), node); + } + + ctx.setAttribute("nodes", assigned); + + return map; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results != null; + + int retVal = 0; + + for (ComputeJobResult res : results) { + assert res.getException() == null : "Load test jobs can never fail: " + ctx; + assert res.getData() != null; + + retVal += (Integer)res.getData(); + } + + return retVal; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java new file mode 100644 index 0000000..a9bb491 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.multisplit; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.gridgain.grid.loadtest.*; +import org.apache.ignite.spi.communication.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.config.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.concurrent.*; + +/** + * Multi-splits load test. + */ +@GridCommonTest(group = "Load Test") +public class GridMultiSplitsLoadTest extends GridCommonAbstractTest { + /** */ + public GridMultiSplitsLoadTest() { + super(true); + } + + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public String getTestGridName() { + return null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + CommunicationSpi commSpi = new TcpCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + DiscoverySpi discoSpi = new TcpDiscoverySpi(); + + cfg.setDiscoverySpi(discoSpi); + + ((ThreadPoolExecutor)cfg.getExecutorService()).prestartAllCoreThreads(); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return (getTestDurationInMinutes() + 1) * 60 * 1000; + } + + /** + * @return Time for load test in minutes. + */ + private int getTestDurationInMinutes() { + return Integer.valueOf(GridTestProperties.getProperty("load.test.duration")); + } + + /** + * @return Number of threads for the test. + */ + private int getThreadCount() { + return Integer.valueOf(GridTestProperties.getProperty("load.test.threadnum")); + } + + /** + * Load test grid. + * + * @throws Exception If task execution failed. + */ + public void testLoad() throws Exception { + final Ignite ignite = G.ignite(getTestGridName()); + + final long end = getTestDurationInMinutes() * 60 * 1000 + System.currentTimeMillis(); + + // Warm up. + ignite.compute().withTimeout(5000).execute(GridLoadTestTask.class.getName(), 3); + + info("Load test will be executed for '" + getTestDurationInMinutes() + "' mins."); + info("Thread count: " + getThreadCount()); + + final GridLoadTestStatistics stats = new GridLoadTestStatistics(); + + GridTestUtils.runMultiThreaded(new Runnable() { + /** {@inheritDoc} */ + @Override public void run() { + IgniteCompute comp = ignite.compute().enableAsync(); + + while (end - System.currentTimeMillis() > 0) { + int levels = 3; + + int exp = factorial(levels); + + long start = System.currentTimeMillis(); + + try { + comp.execute(GridLoadTestTask.class, levels); + + ComputeTaskFuture<Integer> fut = comp.future(); + + int res = fut.get(); + + if (res != exp) + fail("Received wrong result [expected=" + exp + ", actual=" + res + ']'); + + long taskCnt = stats.onTaskCompleted(fut, exp, System.currentTimeMillis() - start); + + if (taskCnt % 500 == 0) + info(stats.toString()); + } + catch (IgniteCheckedException e) { + error("Failed to execute grid task.", e); + + fail(); + } + } + } + }, getThreadCount(), "grid-notaop-load-test"); + + info("Final test statistics: " + stats); + } + + /** + * Calculates factorial. + * + * @param num Factorial to calculate. + * @return Factorial for the number passed in. + */ + private int factorial(int num) { + assert num > 0; + + if (num == 1) + return 1; + + return num * factorial(num -1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsRedeployLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsRedeployLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsRedeployLoadTest.java new file mode 100644 index 0000000..b000933 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsRedeployLoadTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.multisplit; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.gridgain.grid.loadtest.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.config.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.concurrent.*; + +/** + * Multi splits redeploy load test. + */ +@GridCommonTest(group = "Load Test") +public class GridMultiSplitsRedeployLoadTest extends GridCommonAbstractTest { + /** Load test task type ID. */ + public static final String TASK_TYPE_ID = GridLoadTestTask.class.getName(); + + /** */ + public GridMultiSplitsRedeployLoadTest() { + super(true); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + ((ThreadPoolExecutor)cfg.getExecutorService()).prestartAllCoreThreads(); + + return cfg; + } + + /** + * @return Test timeout. + */ + @Override protected long getTestTimeout() { + return (getTestDurationInMinutes() + 1) * 60 * 1000; + } + + /** + * @return Time for load test in minutes. + */ + private int getTestDurationInMinutes() { + return Integer.valueOf(GridTestProperties.getProperty("load.test.duration")); + } + + /** + * @return Number of threads for the test. + */ + private int getThreadCount() { + return Integer.valueOf(GridTestProperties.getProperty("load.test.threadnum")); + } + + /** + * Load test grid. + * + * @throws Exception If task execution failed. + */ + public void testLoad() throws Exception { + final Ignite ignite = G.ignite(getTestGridName()); + + deployTask(ignite); + + final long end = getTestDurationInMinutes() * 60 * 1000 + System.currentTimeMillis(); + + // Warm up. + ignite.compute().withTimeout(10000).execute(TASK_TYPE_ID, 3); + + info("Load test will be executed for '" + getTestDurationInMinutes() + "' mins."); + info("Thread count: " + getThreadCount()); + + final GridLoadTestStatistics stats = new GridLoadTestStatistics(); + + GridTestUtils.runMultiThreaded(new Runnable() { + /** {@inheritDoc} */ + @Override public void run() { + while (end - System.currentTimeMillis() > 0) { + int levels = 3; + + int exp = factorial(levels); + + long start = System.currentTimeMillis(); + + try { + ComputeTaskFuture<Integer> fut = ignite.compute().withTimeout(10000). + execute(TASK_TYPE_ID, levels); + + int res = fut.get(); + + if (res != exp) + fail("Received wrong result [expected=" + exp + ", actual=" + res + ']'); + + long taskCnt = stats.onTaskCompleted(fut, exp, System.currentTimeMillis() - start); + + if (taskCnt % 100 == 0) { + try { + deployTask(ignite); + } + catch (IgniteCheckedException e) { + error("Failed to deploy grid task.", e); + + fail(); + } + } + + if (taskCnt % 500 == 0) + info(stats.toString()); + } + catch (IgniteCheckedException e) { + error("Failed to execute grid task.", e); + + fail(); + } + } + } + }, getThreadCount(), "grid-notaop-load-test"); + + info("Final test statistics: " + stats); + } + + /** + * @param ignite Grid. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void deployTask(Ignite ignite) throws IgniteCheckedException { + ignite.compute().localDeployTask(GridLoadTestTask.class, GridLoadTestTask.class.getClassLoader()); + } + + /** + * Calculates factorial. + * + * @param num Factorial to calculate. + * @return Factorial for the number passed in. + */ + private int factorial(int num) { + assert num > 0; + + return num == 1 ? 1 : num * factorial(num - 1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/package.html b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/package.html new file mode 100644 index 0000000..135eb1a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains internal tests or test related classes and interfaces. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitNewNodesTestJobTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitNewNodesTestJobTarget.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitNewNodesTestJobTarget.java new file mode 100644 index 0000000..611c180 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitNewNodesTestJobTarget.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.newnodes; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; + +/** + * Single split on new nodes test job target. + */ +public class GridSingleSplitNewNodesTestJobTarget { + /** + * @param level Level. + * @param jobSes Job session. + * @return Always returns {@code 1}. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unused") + public int executeLoadTestJob(int level, ComputeTaskSession jobSes) throws IgniteCheckedException { + assert level > 0; + assert jobSes != null; + + try { + assert "1".equals(jobSes.waitForAttribute("1st", 10000)); + + assert "2".equals(jobSes.waitForAttribute("2nd", 10000)); + } + catch (InterruptedException e) { + // Fail. + throw new IgniteCheckedException("Failed to wait for attribute.", e); + } + + return 1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitNewNodesTestTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitNewNodesTestTask.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitNewNodesTestTask.java new file mode 100644 index 0000000..afac145 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitNewNodesTestTask.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.newnodes; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; + +import java.io.*; +import java.util.*; + +/** + * Single split on new nodes test task. + */ +public class GridSingleSplitNewNodesTestTask extends ComputeTaskAdapter<Integer, Integer> { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession taskSes; + + /** */ + @IgniteLoadBalancerResource + private ComputeLoadBalancer balancer; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Integer arg) throws IgniteCheckedException { + assert !subgrid.isEmpty() : "Subgrid cannot be empty."; + + Map<ComputeJobAdapter, ClusterNode> jobs = new HashMap<>(subgrid.size()); + + taskSes.setAttribute("1st", "1"); + taskSes.setAttribute("2nd", "2"); + + Collection<UUID> assigned = new ArrayList<>(subgrid.size()); + + for (int i = 0; i < arg; i++) { + ComputeJobAdapter job = new ComputeJobAdapter(1) { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession jobSes; + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + assert jobSes != null; + + Integer arg = this.<Integer>argument(0); + + assert arg != null; + + return new GridSingleSplitNewNodesTestJobTarget().executeLoadTestJob(arg, jobSes); + } + }; + + ClusterNode node = balancer.getBalancedNode(job, null); + + assert node != null; + + assigned.add(node.id()); + + jobs.put(job, node); + } + + taskSes.setAttribute("nodes", assigned); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + int retVal = 0; + + for (ComputeJobResult res : results) { + assert res.getData() != null : "Load test should return result: " + res; + + retVal += (Integer)res.getData(); + } + + return retVal; + } +}