http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobExecutionSingleNodeLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobExecutionSingleNodeLoadTest.java b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobExecutionSingleNodeLoadTest.java deleted file mode 100644 index 9f8a8e7..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobExecutionSingleNodeLoadTest.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.job; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.util.typedef.*; -import org.gridgain.loadtests.util.*; -import org.gridgain.testframework.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.compute.ComputeJobResultPolicy.*; - -/** - * This test measures the performance of task execution engine by - * submitting empty tasks and collecting the average tasks/second - * statistics. - */ -public class GridJobExecutionSingleNodeLoadTest { - /** Stats update interval in seconds. */ - private static final int UPDATE_INTERVAL_SEC = 10; - - /** Warm-up duration. */ - public static final int WARM_UP_DURATION = 60 * 1000; - - /** - * @param args Command line arguments: - * 1-st: Number of worker threads. Default: 32. - * 2-nd: Test duration in seconds. 0 means infinite. Default: 0. - * 3-rd: File to output test results to. - * @throws Exception If failed. - */ - public static void main(String[] args) throws Exception { - GridFileLock fileLock = GridLoadTestUtils.fileLock(); - - fileLock.lock(); - - try { - // Command line arguments. - int threadCnt = args.length == 0 ? 64 : Integer.parseInt(args[0]); - final int duration = args.length < 2 ? 0 : Integer.parseInt(args[1]); - final String outputFileName = args.length < 3 ? null : args[2]; - - final AtomicLong tasksCnt = new AtomicLong(); - - final AtomicBoolean finish = new AtomicBoolean(); - - ExecutorService pool = Executors.newFixedThreadPool(threadCnt); - - Collection<Callable<Object>> producers = new ArrayList<>(threadCnt); - - Thread collector = null; - - Thread timer = null; - - try { - final Ignite g = G.start("modules/core/src/test/config/grid-job-load.xml"); - - X.println("Warming up..."); - - GridLoadTestUtils.runMultithreadedInLoop(new Callable<Object>() { - @Override public Object call() throws IgniteCheckedException { - g.compute().execute(GridJobExecutionLoadTestTask.class, null); - - return null; - } - }, threadCnt, WARM_UP_DURATION); - - System.gc(); - - X.println("Running main test."); - - for (int i = 0; i < threadCnt; i++) - producers.add(new Callable<Object>() { - @SuppressWarnings({"unchecked", "InfiniteLoopStatement"}) - @Override public Object call() throws Exception { - while (!finish.get()) { - try { - g.compute().execute(GridJobExecutionLoadTestTask.class, null); - - tasksCnt.incrementAndGet(); - } - catch (ComputeTaskCancelledException ignored) { - // No-op. - } - catch (IgniteCheckedException e) { - e.printStackTrace(); - } - } - - return null; - } - }); - - // Thread that measures and outputs performance statistics. - collector = new Thread(new Runnable() { - @SuppressWarnings({"BusyWait", "InfiniteLoopStatement"}) - @Override public void run() { - GridCumulativeAverage avgTasksPerSec = new GridCumulativeAverage(); - - try { - while (!finish.get()) { - long cnt0 = tasksCnt.get(); - - Thread.sleep(UPDATE_INTERVAL_SEC * 1000); - - long cnt1 = tasksCnt.get(); - - long curTasksPerSec = (cnt1 - cnt0) / UPDATE_INTERVAL_SEC; - X.println(">>> Tasks/s: " + curTasksPerSec); - - avgTasksPerSec.update(curTasksPerSec); - } - } - catch (InterruptedException ignored) { - X.println(">>> Interrupted."); - - Thread.currentThread().interrupt(); - } - - X.println(">>> Average tasks/s: " + avgTasksPerSec); - - if (outputFileName != null) { - X.println("Writing test results to a file: " + outputFileName); - - try { - GridLoadTestUtils.appendLineToFile( - outputFileName, - "%s,%d", - GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date()), - avgTasksPerSec.get()); - } - catch (IOException e) { - X.error("Failed to output to a file", e); - } - } - } - }); - collector.start(); - - if (duration > 0) { - // Thread that stops the test after a specified period of time. - timer = new Thread(new Runnable() { - @Override public void run() { - try { - Thread.sleep(duration * 1000); - - finish.set(true); - } - catch (InterruptedException ignored) { - // No-op. - } - } - }); - timer.start(); - } - - pool.invokeAll(producers); - - X.println("All done, stopping."); - - collector.interrupt(); - - pool.shutdown(); - } - finally { - if (collector != null && !collector.isInterrupted()) - collector.interrupt(); - - if (timer != null) - timer.interrupt(); - - G.stopAll(true); - } - } - finally { - fileLock.close(); - } - } - - /** - * Empty task (spawns one empty job). - */ - private static class GridJobExecutionLoadTestTask implements ComputeTask<Object, Object> { - /** {@inheritDoc} */ - @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Object arg) - throws IgniteCheckedException { - return F.asMap(new GridJobExecutionLoadTestJob(), subgrid.get(0)); - } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { - return REDUCE; - } - - /** {@inheritDoc} */ - @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - return null; - } - } - - /** - * Empty job. - */ - private static class GridJobExecutionLoadTestJob implements ComputeJob { - /** {@inheritDoc} */ - @Override public Object execute() throws IgniteCheckedException { - return null; - } - - /** {@inheritDoc} */ - @Override public void cancel() { - // No-op. - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java deleted file mode 100644 index d875b97..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.job; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.gridgain.loadtests.util.*; -import org.gridgain.testframework.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.compute.ComputeJobResultPolicy.*; - -/** - * This test measures the performance of task execution engine by - * submitting empty tasks and collecting the average tasks/second - * statistics. - */ -public class GridJobExecutionSingleNodeSemaphoreLoadTest { - /** Stats update interval in seconds. */ - private static final int UPDATE_INTERVAL_SEC = 10; - - /** Warm-up duration. */ - public static final int WARM_UP_DURATION = 60 * 1000; - - /** - * @param args Command line arguments: - * 1-st: Number of worker threads. Default equals to available CPU number / 2. - * 2-nd: Concurrent tasks count. Default: 1024. - * 3-rd: Test duration in seconds. 0 means infinite. Default: 0. - * 4-th: File to output test results to. - * @throws Exception If failed. - */ - public static void main(String[] args) throws Exception { - GridFileLock fileLock = GridLoadTestUtils.fileLock(); - - fileLock.lock(); - - try { - // Command line arguments. - // - // NOTE: on MacOS better numbers are shown if public pool core and max sizes are - // equal to CPU count. And producer threads count is equal to CPU count. - // - int threadCnt = args.length > 0 ? Integer.parseInt(args[0]) : - Runtime.getRuntime().availableProcessors() / 2; - int taskCnt = args.length > 1 ? Integer.parseInt(args[1]) : 1024; - final int duration = args.length > 2 ? Integer.parseInt(args[2]) : 0; - final String outputFileName = args.length > 3 ? args[3] : null; - - final LongAdder execCnt = new LongAdder(); - - try { - final Ignite g = G.start("modules/tests/config/grid-job-load.xml"); - - X.println("Thread count: " + threadCnt); - X.println("Task count: " + taskCnt); - X.println("Duration: " + duration); - - X.println("Warming up..."); - - g.compute().execute(GridJobExecutionLoadTestTask.class, null); - g.compute().execute(GridJobExecutionLoadTestTask.class, null); - - runTest(g, threadCnt, taskCnt, WARM_UP_DURATION, execCnt); - - System.gc(); - - execCnt.reset(); - - X.println("Running main test."); - - IgniteFuture<Void> collectorFut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - GridCumulativeAverage avgTasksPerSec = new GridCumulativeAverage(); - - try { - while (!Thread.currentThread().isInterrupted()) { - U.sleep(UPDATE_INTERVAL_SEC * 1000); - - long curTasksPerSec = execCnt.sumThenReset() / UPDATE_INTERVAL_SEC; - - X.println(">>> Tasks/s: " + curTasksPerSec); - - avgTasksPerSec.update(curTasksPerSec); - } - } - catch (IgniteInterruptedException ignored) { - X.println(">>> Interrupted."); - - Thread.currentThread().interrupt(); - } - - X.println(">>> Average tasks/s: " + avgTasksPerSec); - - if (outputFileName != null) { - X.println("Writing test results to a file: " + outputFileName); - - try { - GridLoadTestUtils.appendLineToFile( - outputFileName, - "%s,%d", - GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date()), - avgTasksPerSec.get()); - } - catch (IOException e) { - X.error("Failed to output to a file", e); - } - } - - return null; - } - }); - - runTest(g, threadCnt, taskCnt, duration * 1000, execCnt); - - X.println("All done, stopping."); - - collectorFut.cancel(); - } - finally { - G.stopAll(true); - } - } - finally { - fileLock.close(); - } - } - - /** - * Runs the actual load test. - * - * @param g Grid. - * @param threadCnt Number of threads. - * @param taskCnt Number of tasks. - * @param dur Test duration. - * @param iterCntr Iteration counter. - */ - private static void runTest(final Ignite g, int threadCnt, int taskCnt, long dur, - final LongAdder iterCntr) { - final Semaphore sem = new Semaphore(taskCnt); - - final IgniteInClosure<IgniteFuture> lsnr = new CI1<IgniteFuture>() { - @Override public void apply(IgniteFuture t) { - sem.release(); - } - }; - - GridLoadTestUtils.runMultithreadedInLoop(new Callable<Object>() { - @Nullable @Override public Object call() throws Exception { - sem.acquire(); - - IgniteCompute comp = g.compute().enableAsync(); - - comp.execute(GridJobExecutionLoadTestTask.class, null); - - ComputeTaskFuture<Object> f = comp.future(); - - f.listenAsync(lsnr); - - iterCntr.increment(); - - return null; - } - }, threadCnt, dur > 0 ? dur : Long.MAX_VALUE); - } - - /** - * Empty task (spawns one empty job). - */ - private static class GridJobExecutionLoadTestTask implements ComputeTask<Object, Object> { - /** {@inheritDoc} */ - @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Object arg) - throws IgniteCheckedException { - return F.asMap(new GridJobExecutionLoadTestJob(), subgrid.get(0)); - } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { - return REDUCE; - } - - /** {@inheritDoc} */ - @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - return null; - } - } - - /** - * Empty job. - */ - private static class GridJobExecutionLoadTestJob implements ComputeJob { - /** {@inheritDoc} */ - @Override public Object execute() throws IgniteCheckedException { - return null; - } - - /** {@inheritDoc} */ - @Override public void cancel() { - // No-op. - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTest.java b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTest.java deleted file mode 100644 index c5deb7f..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTest.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.job; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.springframework.beans.factory.*; -import org.springframework.context.support.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Tests task/job behavior under load. - */ -public class GridJobLoadTest implements Runnable { - /** Test configuration directory. */ - private static final File TEST_CONF_DIR; - - /** - * - */ - static { - try { - TEST_CONF_DIR = new File(U.resolveGridGainUrl("/modules/core/src/test/config/job-loadtest").toURI()); - } - catch (URISyntaxException e) { - throw new RuntimeException("Failed to initialize directory.", e); - } - } - - /** File with test configuration. */ - private static final File TEST_CONFIGURATION_FILE = new File(TEST_CONF_DIR, "job-loadtest.properties"); - - /** File with client nodes configuration. */ - private static final File CLIENT_NODE_CONFIGURATION = new File(TEST_CONF_DIR, "client.xml"); - - /** File with server nodes configuration. */ - private static final File SERVER_NODE_CONFIGURATION = new File(TEST_CONF_DIR, "server.xml"); - - /** - * Main method. - * - * @param args Command-line parameters. - * @throws Exception if test start failed. - */ - public static void main(String[] args) throws Exception { - new GridJobLoadTest().run(); - } - - /** Number of client nodes to run. */ - private int clientNodes; - - /** Number of server nodes to run. */ - private int srvNodes; - - /** Number of submitter threads per client node. */ - private int threadsPerClient; - - /** Parameters for executing jobs. */ - private GridJobLoadTestParams taskParams; - - /** Submission/cancel ratio for submitting threads. */ - private int cancelRate; - - /** Time to sleep between task submissions. */ - private long submitDelay; - - /** Number of nodes running inside this test. */ - private int runningNodes; - - /** - * Generate new node number. - * - * @return a client number unique within this test run. - */ - private int getNextNodeNum() { - return ++runningNodes; - } - - /** - * Loads test configuration. - * - * @throws Exception if configuration is unawailable or broken. - */ - private void loadTestConfiguration() throws Exception { - assert TEST_CONFIGURATION_FILE.isFile(); - - InputStream in = null; - - Properties p = new Properties(); - - try { - in = new FileInputStream(TEST_CONFIGURATION_FILE); - - p.load(in); - } - finally { - U.closeQuiet(in); - } - - clientNodes = Integer.parseInt(p.getProperty("client.nodes.count")); - srvNodes = Integer.parseInt(p.getProperty("server.nodes.count")); - threadsPerClient = Integer.parseInt(p.getProperty("threads.per.client")); - cancelRate = Integer.parseInt(p.getProperty("cancel.rate")); - submitDelay = Long.parseLong(p.getProperty("submit.delay")); - - taskParams = new GridJobLoadTestParams( - Integer.parseInt(p.getProperty("jobs.count")), - Integer.parseInt(p.getProperty("jobs.test.duration")), - Integer.parseInt(p.getProperty("jobs.test.completion.delay")), - Double.parseDouble(p.getProperty("jobs.failure.probability")) - ); - } - - /** {@inheritDoc} */ - @Override public void run() { - List<Ignite> clientIgnites = runGrid(); - - assert clientIgnites.size() == clientNodes; - - int threadsCnt = clientNodes * threadsPerClient; - - Executor e = Executors.newFixedThreadPool(threadsCnt); - - for (Ignite ignite : clientIgnites) { - for (int j = 0; j < threadsPerClient; j++) - e.execute(new GridJobLoadTestSubmitter(ignite, taskParams, cancelRate, submitDelay)); - } - } - - /** - * Run all grid nodes as defined in test configuration. - * - * @return list of run nodes. - */ - private List<Ignite> runGrid() { - List<Ignite> clientIgnites = new ArrayList<>(clientNodes); - - try { - loadTestConfiguration(); - - for (int i = 0; i < srvNodes; i++) - startNode("server", SERVER_NODE_CONFIGURATION); - - // Start clients in the second order to cache a client node in GridGain. - for (int i = 0; i < clientNodes; i++) - clientIgnites.add(startNode("client", CLIENT_NODE_CONFIGURATION)); - } - catch (Exception e) { - throw new RuntimeException(e); - } - - return clientIgnites; - } - - /** - * Starts new grid node. - * - * @param gridName name of new node. - * @param springCfg file with spring configuration to use for this node. - * @return a grid instance local to new node {@link org.apache.ignite.Ignition#start(org.apache.ignite.configuration.IgniteConfiguration)}. - * @throws Exception if node run failed. - */ - protected Ignite startNode(String gridName, File springCfg) throws Exception { - assert springCfg != null; - - ListableBeanFactory springCtx = new FileSystemXmlApplicationContext( - "file:///" + springCfg.getAbsolutePath()); - - Map cfgMap = springCtx.getBeansOfType(IgniteConfiguration.class); - - assert cfgMap != null; - assert !cfgMap.isEmpty(); - - IgniteConfiguration cfg = (IgniteConfiguration)cfgMap.values().iterator().next(); - - cfg.setGridName(gridName + "-" + getNextNodeNum()); - - return G.start(cfg); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestJob.java b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestJob.java deleted file mode 100644 index a9ed5a7..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestJob.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.job; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.resources.*; -import org.gridgain.grid.*; - -import java.util.*; - -import static java.lang.Thread.*; - -/** - * Job for load test. - */ -public class GridJobLoadTestJob implements ComputeJob { - /** Length of the sequence emitted into session attributes. */ - private static final int EMIT_SEQUENCE_LENGTH = 10; - - /** Flag indicating whether this job should emit sequence into session attributes. */ - private final boolean emitAttrs; - - /** Probability of failure. */ - private final double failProbability; - - /** Duration between job start and random failure check. */ - private final long executionDuration; - - /** Duration between failure check and returning from {@link GridJobLoadTestJob#execute()}. */ - private final int completionDelay; - - /** Logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** Ignite instance. */ - @IgniteInstanceResource - private Ignite ignite; - - /** Job context. */ - @IgniteJobContextResource - private ComputeJobContext cntx; - - /** Task session. */ - @IgniteTaskSessionResource - private ComputeTaskSession taskSes; - - /** - * @param emitAttrs if {@code true} then this work should emit number sequence into session attribute - * @param failProbability Probability of failure. - * @param executionDuration Duration between job start and random failure check. - * @param completionDelay Duration between failure check and returning from - * {@link GridJobLoadTestJob#execute()}. - */ - public GridJobLoadTestJob(boolean emitAttrs, double failProbability, long executionDuration, - int completionDelay) { - this.emitAttrs = emitAttrs; - this.failProbability = failProbability; - this.executionDuration = executionDuration; - this.completionDelay = completionDelay; - } - - /**{@inheritDoc}*/ - @Override public void cancel() { - Thread.currentThread().interrupt(); - } - - /**{@inheritDoc}*/ - @Override public Integer execute() throws IgniteCheckedException { - try { - if (log.isInfoEnabled()) - log.info("Job started " + getJobInfo()); - - doJob(); - - if (new Random().nextDouble() <= failProbability) { - if (log.isInfoEnabled()) - log.info("Failing job " + getJobInfo()); - - throw new RuntimeException("Task failure simulation"); - } - - sleep(new Random().nextInt(completionDelay)); - - if (log.isInfoEnabled()) - log.info("Job is completing normally " + getJobInfo()); - } - catch (InterruptedException ignored) { - if (log.isDebugEnabled()) - log.debug("Job was cancelled " + getJobInfo()); - - // Let the method return normally. - } - - return 1; - } - - /** - * Performs job actions, depending on {@code emitAttributes} and {@code executionDuration} attribute values. - * - * @throws InterruptedException if task was cancelled during job execution. - */ - @SuppressWarnings("BusyWait") - private void doJob() throws InterruptedException { - if (emitAttrs) { - for (int i = 0; i < EMIT_SEQUENCE_LENGTH; i++) { - try { - taskSes.setAttribute(String.valueOf(i), i); - } - catch (IgniteCheckedException e) { - log.error("Set attribute failed.", e); - } - - sleep(executionDuration); - } - } - else { - sleep(executionDuration); - - Map<?, ?> attrs = taskSes.getAttributes(); - - boolean valMissed = false; - - for (int i = 0; i < EMIT_SEQUENCE_LENGTH; i++) { - Integer val = (Integer) attrs.get(String.valueOf(i)); - - // We shouldn't run in situation when some elements emitted before are missed and the current exists. - assert ! (valMissed && val != null) : - "Inconsistent session attribute set was received [missedAttribute=" + i + - ", jobId=" + cntx.getJobId() + ", attrs=" + attrs + ", nodeId=" + - ignite.configuration().getNodeId() + "]"; - - valMissed = (val == null); - } - } - } - - /** - * Gives job description in standard log format. - * - * @return String with current job representation. - */ - private String getJobInfo() { - return "[taskId=" + taskSes.getId() + ", jobId=" + cntx.getJobId() + ", nodeId=" + - ignite.configuration().getNodeId() + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestParams.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestParams.java b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestParams.java deleted file mode 100644 index bdfe969..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestParams.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.job; - -/** - * Test task parameters. - */ -public class GridJobLoadTestParams { - /** Number of jobs to be spawned. */ - private final int jobsCnt; - - /** Duration between job start and failure check. */ - private final long executionDuration; - - /** Duration between failure check and job completion. */ - private final int completionDelay; - - /** Probability of simulated job failure. */ - private final double jobFailureProbability; - - /** - * @param jobsCnt Number of jobs to be spawned. - * @param executionDuration Duration between job start and failure check. - * @param completionDelay Duration between failure check and job completion. - * @param jobFailureProbability Probability of simulated job failure. - */ - public GridJobLoadTestParams(int jobsCnt, long executionDuration, int completionDelay, double jobFailureProbability) { - this.jobsCnt = jobsCnt; - this.executionDuration = executionDuration; - this.completionDelay = completionDelay; - this.jobFailureProbability = jobFailureProbability; - } - - /** - * Returns number of jobs to be spawned. - * - * @return Number of jobs to be spawned. - */ - public int getJobsCount() { - return jobsCnt; - } - - /** - * Returns duration between job start and failure check. - * - * @return Duration between job start and failure check. - */ - public long getExecutionDuration() { - return executionDuration; - } - - /** - * Returns duration between failure check and job completion. - * - * @return Duration between failure check and job completion. - */ - public int getCompletionDelay() { - return completionDelay; - } - - /** - * Returns probability of simulated job failure. - * - * @return Probability of simulated job failure. - */ - public double getJobFailureProbability() { - return jobFailureProbability; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestSubmitter.java b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestSubmitter.java deleted file mode 100644 index dac7452..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestSubmitter.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.job; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.*; - -import java.util.*; - -/** - * Runnable with continuous task submission and result checking. - */ -public class GridJobLoadTestSubmitter implements Runnable { - /** */ - public static final int TIMEOUT = 120000; - - /** Grid where all tasks should be submitted. */ - private final Ignite ignite; - - /** Params of simulated jobs. */ - private final GridJobLoadTestParams params; - - /** Time to sleep between task submissions. */ - private final long submitDelay; - - /** Submission/cancel ratio. */ - private final int cancelRate; - - /** List of futures for submitted tasks. */ - private final List<ComputeTaskFuture<Integer>> futures = new LinkedList<>(); - - /** Counter to implement fixed submit/cancel ratio. */ - private int iteration; - - /** - * @param ignite Grid where all tasks should be submitted. - * @param params Params of simulated jobs. - * @param cancelRate Submission/cancel ratio. - * @param submitDelay Time to sleep between task submissions. - */ - public GridJobLoadTestSubmitter(Ignite ignite, GridJobLoadTestParams params, int cancelRate, long submitDelay) { - this.ignite = ignite; - this.params = params; - this.cancelRate = cancelRate; - this.submitDelay = submitDelay; - } - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override public void run() { - IgniteCompute comp = ignite.compute().enableAsync(); - - while (true) { - checkCompletion(); - - performCancel(); - - try { - Thread.sleep(submitDelay); - } - catch (InterruptedException ignored) { - return; - } - - try { - comp.withTimeout(TIMEOUT).execute(GridJobLoadTestTask.class, params); - - futures.add(comp.<Integer>future()); - } - catch (IgniteCheckedException e) { - // Should not be thrown since uses asynchronous execution. - throw new IgniteException(e); - } - } - } - - /** - * Finds completed tasks in the queue and removes them. - */ - private void checkCompletion() { - for (Iterator<ComputeTaskFuture<Integer>> iter = futures.iterator(); iter.hasNext();) { - ComputeTaskFuture<Integer> fut = iter.next(); - - if (fut.isDone()) { - try { - Integer res = fut.get(); - - assert res == params.getJobsCount() : - "Task returned wrong result [taskIs=" + fut.getTaskSession().getId() + ", result=" + res + "]"; - - ignite.log().info(">>> Task completed successfully. Task id: " + fut.getTaskSession().getId()); - } - catch (IgniteFutureCancelledException ignored) { - ignite.log().info(">>> Task cancelled: " + fut.getTaskSession().getId()); - } - catch (IgniteCheckedException e) { - ignite.log().warning( - ">>> Get operation for completed task failed: " + fut.getTaskSession().getId(), e); - } - finally { - iter.remove(); - } - } - } - } - - /** - * Cancel a random task when required. - */ - private void performCancel() { - iteration++; - - if (iteration % cancelRate == 0) { - // Here we should have mostly running futures so just pick one. - ComputeTaskFuture<Integer> futToCancel = futures.get( new Random().nextInt(futures.size()) ); - - try { - futToCancel.cancel(); - ignite.log().info("Task canceled: " + futToCancel.getTaskSession().getId()); - } - catch (IgniteCheckedException e) { - ignite.log().warning(">>> Future cancellation failed: " + futToCancel.getTaskSession().getId(), e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestTask.java b/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestTask.java deleted file mode 100644 index 893a8b5..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/job/GridJobLoadTestTask.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.job; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.apache.ignite.compute.ComputeJobResultPolicy.*; - -/** - * Test task for {@link GridJobLoadTest} - */ -public class GridJobLoadTestTask extends ComputeTaskAdapter<GridJobLoadTestParams, Integer> { - /**{@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable GridJobLoadTestParams arg) - throws IgniteCheckedException { - assert !subgrid.isEmpty(); - - Map<ComputeJob, ClusterNode> jobs = new HashMap<>(); - - for (int i = 0; i < arg.getJobsCount(); i++) - jobs.put( - new GridJobLoadTestJob( - /*only on the first step*/i == 0, - arg.getJobFailureProbability(), - arg.getExecutionDuration(), - arg.getCompletionDelay()), - subgrid.get(0)); - - return jobs; - } - - /** - * Always trying to failover job, except failed assertions. - * - * {@inheritDoc} - */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { - return res.getException() == null ? WAIT : - res.getException().getCause() instanceof AssertionError ? REDUCE : FAILOVER; - } - - /**{@inheritDoc} */ - @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - int sum = 0; - - for (ComputeJobResult r: results) { - if (!r.isCancelled() && r.getException() == null) - sum += r.<Integer>getData(); - } - - return sum; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/job/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/job/package.html b/modules/core/src/test/java/org/gridgain/loadtests/job/package.html deleted file mode 100644 index e291932..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/job/package.html +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> -<!-- Package description. --> -Contains internal tests or test related classes and interfaces. -</body> -</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/lang/GridConcurrentLinkedHashMapBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/lang/GridConcurrentLinkedHashMapBenchmark.java b/modules/core/src/test/java/org/gridgain/loadtests/lang/GridConcurrentLinkedHashMapBenchmark.java deleted file mode 100644 index 37a759b..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/lang/GridConcurrentLinkedHashMapBenchmark.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.lang; - -import org.apache.ignite.internal.util.typedef.*; -import org.jdk8.backport.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Benchmark for different accessors in {@link ConcurrentLinkedHashMap}. - */ -public class GridConcurrentLinkedHashMapBenchmark { - /** Number of keys to use in benchmark. */ - private static final int KEY_RANGE = 1000; - - /** Amount of writes from total number of iterations. */ - private static final double WRITE_RATE = 0.2; - - /** - * @param args Command-line arguments. - */ - public static void main(String[] args) { - System.out.printf("%8s, %8s, %12s, %12s, %12s, %8s, %8s\n", - "Method", "Threads", "It./s.", "It./s.*th.", "Iters.", "Time", "Writes"); - - for (int i = 1; i <= 32; i*=2) - testGet(i, WRITE_RATE); - - for (int i = 1; i <= 32; i*=2) - testGetSafe(i, WRITE_RATE); - } - - /** - * Tests {@link ConcurrentLinkedHashMap#getSafe(Object)} method. - * - * @param threadCnt Number of threads to run. - * @param writeProportion Amount of writes from total number of iterations. - */ - public static void testGetSafe(int threadCnt, double writeProportion) { - test(new C2<Integer, ConcurrentLinkedHashMap<Integer, Integer>, Integer>() { - @Override public Integer apply(Integer key, ConcurrentLinkedHashMap<Integer, Integer> map) { - return map.getSafe(key); - } - - @Override public String toString() { - return "getSafe"; - } - }, threadCnt, writeProportion); - } - - /** - * Tests {@link ConcurrentLinkedHashMap#get(Object)} method. - * - * @param threadCnt Number of threads to run. - * @param writeProportion Amount of writes from total number of iterations. - */ - public static void testGet(int threadCnt, double writeProportion) { - test(new C2<Integer, ConcurrentLinkedHashMap<Integer, Integer>, Integer>() { - @Override public Integer apply(Integer key, ConcurrentLinkedHashMap<Integer, Integer> map) { - return map.get(key); - } - - @Override public String toString() { - return "get"; - } - }, threadCnt, writeProportion); - } - - /** - * Test a generic access method on map. - * - * @param readOp Access method to test. - * @param threadCnt Number of threads to run. - * @param writeProportion Amount of writes from total number of iterations. - */ - @SuppressWarnings({"BusyWait"}) - private static void test(C2<Integer, ConcurrentLinkedHashMap<Integer, Integer>, Integer> readOp, int threadCnt, - double writeProportion) { - assert writeProportion < 1; - - ConcurrentLinkedHashMap<Integer, Integer> map = new ConcurrentLinkedHashMap<>(); - - CyclicBarrier barrier = new CyclicBarrier(threadCnt + 1); - - Collection<TestThread> threads = new ArrayList<>(threadCnt); - - for (int i = 0; i < threadCnt; i++) { - TestThread thread = new TestThread(readOp, map, writeProportion, barrier); - - threads.add(thread); - - thread.start(); - } - - long start; - - try { - // Wait threads warm-up. - while (barrier.getNumberWaiting() != threadCnt) - Thread.sleep(1); - - // Starting test and letting it run for 1 minute. - barrier.await(); - - start = System.currentTimeMillis(); - - Thread.sleep(60000); - } - catch (InterruptedException ignored) { - return; - } - catch (BrokenBarrierException e) { - e.printStackTrace(); - - return; - } - - for (TestThread th : threads) - th.interrupt(); - - try { - for (TestThread th : threads) - th.join(); - } - catch (InterruptedException ignored) { - return; - } - - long time = System.currentTimeMillis() - start; - - long iters = 0; - - for (TestThread th : threads) - iters += th.iterations(); - - System.out.printf("%8s, %8d, %12d, %12d, %12d, %8.3f, %8.2f\n", - readOp.toString(), threadCnt, 1000*iters/time, 1000*iters/(time*threadCnt), iters, time/(double)1000, writeProportion); - } - - /** - * Test thread. Performs read/write operations on the given map - * with the given ration. - */ - private static class TestThread extends Thread { - /** */ - private final C2<Integer, ConcurrentLinkedHashMap<Integer, Integer>, Integer> readOp; - - /** */ - private final ConcurrentLinkedHashMap<Integer, Integer> map; - - /** */ - private final double writeProportion; - - /** */ - private final CyclicBarrier barrier; - - /** */ - private final Random rnd = new Random(); - - /** Number of passed run iterations. */ - private long iterations; - - /** - * @param readOp Read operation to test. - * @param map Map to test. - * @param writeProportion Amount of writes from total number of iterations. - * @param barrier Barrier to await before starting 'clear' run. - */ - TestThread(final C2<Integer, ConcurrentLinkedHashMap<Integer, Integer>, Integer> readOp, - ConcurrentLinkedHashMap<Integer, Integer> map, double writeProportion, CyclicBarrier barrier) { - this.readOp = readOp; - this.map = map; - this.writeProportion = writeProportion; - this.barrier = barrier; - } - - /** {@inheritDoc} */ - @Override public void run() { - for (int i = 0; i < 1000000; i++) - doIteration(); - - try { - barrier.await(); - } - catch (InterruptedException ignored) { - return; - } - catch (BrokenBarrierException e) { - e.printStackTrace(); - - return; - } - - while (!interrupted()) { - doIteration(); - - iterations++; - } - } - - /** - * Performs test iteration. - */ - private void doIteration() { - Integer key = rnd.nextInt(KEY_RANGE); - - if (rnd.nextDouble() <= writeProportion) - map.put(key, rnd.nextInt()); - else - readOp.apply(key, map); - } - - /** - * @return Number of passes iterations. - */ - public long iterations() { - return iterations; - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperLoadTest1.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperLoadTest1.java b/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperLoadTest1.java deleted file mode 100644 index 6c2f180..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperLoadTest1.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.mapper; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.*; - -/** - * Continuous mapper load test. - */ -public class GridContinuousMapperLoadTest1 { - /** - * Main method. - * - * @param args Parameters. - * @throws IgniteCheckedException If failed. - */ - public static void main(String[] args) throws IgniteCheckedException { - try (Ignite g = G.start("examples/config/example-cache.xml")) { - int max = 30000; - - IgniteDataLoader<Integer, TestObject> ldr = g.dataLoader("replicated"); - - for (int i = 0; i < max; i++) - ldr.addData(i, new TestObject(i, "Test object: " + i)); - - // Wait for loader to complete. - ldr.close(false); - - X.println("Populated replicated cache."); - - g.compute().execute(new GridContinuousMapperTask1(), max); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperLoadTest2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperLoadTest2.java b/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperLoadTest2.java deleted file mode 100644 index acb3aa8..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperLoadTest2.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.mapper; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Continuous mapper load test. - */ -public class GridContinuousMapperLoadTest2 { - /** - * Main method. - * - * @param args Parameters. - * @throws Exception If failed. - */ - public static void main(String[] args) throws Exception { - final AtomicInteger jobIdGen = new AtomicInteger(); - final AtomicInteger sentJobs = new AtomicInteger(); - - final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10); - - /** Worker thread. */ - Thread t = new Thread("mapper-worker") { - @Override public void run() { - try { - while (!Thread.currentThread().isInterrupted()) - queue.put(jobIdGen.incrementAndGet()); - } - catch (InterruptedException ignore) { - // No-op. - } - } - }; - - Ignite g = G.start("examples/config/example-cache.xml"); - - try { - int max = 20000; - - IgniteDataLoader<Integer, TestObject> ldr = g.dataLoader("replicated"); - - for (int i = 0; i < max; i++) - ldr.addData(i, new TestObject(i, "Test object: " + i)); - - // Wait for loader to complete. - ldr.close(false); - - X.println("Populated replicated cache."); - - t.start(); - - while (sentJobs.get() < max) { - int[] jobIds = new int[10]; - - for (int i = 0; i < jobIds.length; i++) - jobIds[i] = queue.take(); - - sentJobs.addAndGet(10); - - g.compute().execute(new GridContinuousMapperTask2(), jobIds); - } - } - finally { - t.interrupt(); - - t.join(); - - G.stopAll(false); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperTask1.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperTask1.java b/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperTask1.java deleted file mode 100644 index 6b5ed4a..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperTask1.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.mapper; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Test task. - */ -@SuppressWarnings("TransientFieldNotInitialized") -@ComputeTaskNoResultCache -public class GridContinuousMapperTask1 extends ComputeTaskAdapter<Integer, Integer> { - /** Job ID generator. */ - private final transient AtomicInteger jobIdGen = new AtomicInteger(); - - /** Mapper. */ - @IgniteTaskContinuousMapperResource - private ComputeTaskContinuousMapper mapper; - - /** Grid. */ - @IgniteInstanceResource - private Ignite g; - - /** Blocking queue. */ - private final transient LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10); - - /** Sent jobs count. */ - private final transient AtomicInteger sentJobs = new AtomicInteger(); - - /** Maximum number of executions. */ - private transient int maxExecs; - - /** Worker thread. */ - private transient Thread t = new Thread("mapper-worker") { - @Override public void run() { - try { - while (!Thread.currentThread().isInterrupted()) - queue.put(jobIdGen.getAndIncrement()); - } - catch (InterruptedException ignore) { - // No-op. - } - } - }; - - /** - * Sends job to node. - * - * @param n Node. - * @throws IgniteCheckedException If failed. - */ - private void sendJob(ClusterNode n) throws IgniteCheckedException { - try { - int jobId = queue.take(); - - sentJobs.incrementAndGet(); - - mapper.send(new ComputeJobAdapter(jobId) { - @IgniteInstanceResource - private Ignite g; - - @Override public Object execute() { - Integer jobId = argument(0); - - X.println(">>> Received job for ID: " + jobId); - - return g.cache("replicated").peek(jobId); - } - }, n); - } - catch (InterruptedException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Integer arg) - throws IgniteCheckedException { - maxExecs = arg; - - // Start worker thread. - t.start(); - - if (g.cluster().nodes().size() == 1) - sendJob(g.cluster().localNode()); - else - for (ClusterNode n : g.cluster().forRemotes().nodes()) - sendJob(n); - - return null; - } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { - if (res.getException() != null) - throw new IgniteCheckedException(res.getException()); - - TestObject o = res.getData(); - - assert o != null; - - X.println("Received job result from node [resId=" + o.getId() + ", node=" + res.getNode().id() + ']'); - - if (sentJobs.get() < maxExecs) - sendJob(res.getNode()); - - return ComputeJobResultPolicy.WAIT; - } - - /** {@inheritDoc} */ - @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - X.println(">>> Reducing task..."); - - t.interrupt(); - - try { - t.join(); - } - catch (InterruptedException e) { - throw new IgniteCheckedException(e); - } - - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperTask2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperTask2.java b/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperTask2.java deleted file mode 100644 index fc3b85d..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridContinuousMapperTask2.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.mapper; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Test task. - */ -public class GridContinuousMapperTask2 extends ComputeTaskAdapter<int[], Integer> { - /** Grid. */ - @IgniteInstanceResource - private Ignite g; - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable int[] jobIds) - throws IgniteCheckedException { - Map<ComputeJob, ClusterNode> mappings = new HashMap<>(jobIds.length); - - Iterator<ClusterNode> nodeIter = g.cluster().forRemotes().nodes().iterator(); - - for (int jobId : jobIds) { - ComputeJob job = new ComputeJobAdapter(jobId) { - @IgniteInstanceResource - private Ignite g; - - @Override public Object execute() { - Integer jobId = argument(0); - - X.println(">>> Received job for ID: " + jobId); - - return g.cache("replicated").peek(jobId); - } - }; - - // If only local node in the grid. - if (g.cluster().nodes().size() == 1) - mappings.put(job, g.cluster().localNode()); - else { - ClusterNode n = nodeIter.hasNext() ? nodeIter.next() : - (nodeIter = g.cluster().forRemotes().nodes().iterator()).next(); - - mappings.put(job, n); - } - } - - return mappings; - } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { - TestObject o = res.getData(); - - X.println("Received job result from node [resId=" + o.getId() + ", node=" + res.getNode().id() + ']'); - - return super.result(res, rcvd); - } - - /** {@inheritDoc} */ - @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - X.println(">>> Reducing task..."); - - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridNodeStartup.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridNodeStartup.java b/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridNodeStartup.java deleted file mode 100644 index 609cfdb..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/mapper/GridNodeStartup.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.mapper; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.*; - -import javax.swing.*; - -/** - * Starts up an empty node with cache configuration. - * You can also start a stand-alone GridGain instance by passing the path - * to configuration file to {@code 'ggstart.{sh|bat}'} script, like so: - * {@code 'ggstart.sh examples/config/example-cache.xml'}. - * <p> - * The difference is that running this class from IDE adds all example classes to classpath - * but running from command line doesn't. - */ -public class GridNodeStartup { - /** - * Start up an empty node with specified cache configuration. - * - * @param args Command line arguments, none required. - * @throws IgniteCheckedException If example execution failed. - */ - public static void main(String[] args) throws IgniteCheckedException { - try (Ignite ignored = G.start("examples/config/example-cache.xml")) { - // Wait until Ok is pressed. - JOptionPane.showMessageDialog( - null, - new JComponent[] { - new JLabel("GridGain started."), - new JLabel("Press OK to stop GridGain.") - }, - "GridGain", - JOptionPane.INFORMATION_MESSAGE - ); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/mapper/TestObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/mapper/TestObject.java b/modules/core/src/test/java/org/gridgain/loadtests/mapper/TestObject.java deleted file mode 100644 index 9325e8e..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/mapper/TestObject.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.mapper; - -import org.apache.ignite.cache.query.*; - -import java.io.*; - -/** - * Test object. - */ -public class TestObject implements Serializable { - /** ID. */ - @GridCacheQuerySqlField(unique = true) - private int id; - - /** Text. */ - @GridCacheQuerySqlField - private String txt; - - /** - * @param id ID. - * @param txt Text. - */ - public TestObject(int id, String txt) { - this.id = id; - this.txt = txt; - } - - /** - * @return ID. - */ - public int getId() { - return id; - } - - /** - * @return Text. - */ - public String getText() { - return txt; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/mergesort/GridMergeSortLoadTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/mergesort/GridMergeSortLoadTask.java b/modules/core/src/test/java/org/gridgain/loadtests/mergesort/GridMergeSortLoadTask.java deleted file mode 100644 index f072e16..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/mergesort/GridMergeSortLoadTask.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.mergesort; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.util.*; - -/** - * A task that performs distributed Merge Sort. - */ -public class GridMergeSortLoadTask extends ComputeTaskSplitAdapter<int[], int[]> { - /** Injected Grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - - /** - * Receives the array to sort, splits it into 2 arrays, and returns 2 - * jobs that perform that task recursively, each for the corresponding part - * of the array. Each recursive task will return a sorted array. - * - * Because this is a recursive algorithm and we cannot hold threads are every - * recursion step, we use the <i>continuation</i> mechanism - * ({@link org.apache.ignite.compute.ComputeJobContext} methods {@code holdcc()} and {@code callcc()}) - * to pause the parent tasks while the child tasks are running. Otherwise we may - * run out of threads. - * - * @param gridSize Number of available grid nodes. Note that returned number of - * jobs can be less, equal or greater than this grid size. - * @param initArr Array to sort. - * @return 2 jobs that will run the sort recursively for each part of the array. - */ - @Override protected Collection<ComputeJob> split(int gridSize, int[] initArr) { - Collection<ComputeJob> jobs = new LinkedList<>(); - - for (final int[] arr : splitArray(initArr)) { - jobs.add(new ComputeJobAdapter() { - // Auto-inject job context. - @IgniteJobContextResource - private ComputeJobContext jobCtx; - - // Task execution result future. - private ComputeTaskFuture<int[]> fut; - - @Override public Object execute() throws IgniteCheckedException { - if (arr.length == 1) - return arr; - - // Future is null before holdcc() is called and - // not null after callcc() is called. - if (fut == null) { - IgniteCompute comp = ignite.compute().enableAsync(); - - // Launch the recursive child task asynchronously. - comp.execute(new GridMergeSortLoadTask(), arr); - - fut = comp.future(); - - // Add a listener to the future, that will resume the - // parent task once the child one is completed. - fut.listenAsync(new CI1<IgniteFuture<int[]>>() { - @Override public void apply(IgniteFuture<int[]> fut) { - // CONTINUATION: - // ============= - // Resume suspended job execution. - jobCtx.callcc(); - } - }); - - // CONTINUATION: - // ============= - // Suspend job execution to be continued later and - // release the executing thread. - return jobCtx.holdcc(); - } - else { - assert fut.isDone(); - - // Return the result of a completed child task. - return fut.get(); - } - } - }); - } - - return jobs; - } - - /** - * This method is called when both child jobs are completed, and is a - * Reduce step of Merge Sort algorithm. - * - * On this step we do a merge of 2 sorted arrays, produced by child tasks, - * into a 1 sorted array. - * - * @param results The child task execution results (sorted arrays). - * @return A merge result: single sorted array. - */ - @Override public int[] reduce(List<ComputeJobResult> results) { - if (results.size() == 1) // This is in case we have a single-element array. - return results.get(0).getData(); - - assert results.size() == 2; - - int[] arr1 = results.get(0).getData(); - int[] arr2 = results.get(1).getData(); - - return mergeArrays(arr1, arr2); - } - - /** - * Splits the array into two parts. - * - * If array size is odd, then the second part is one element - * greater than the first one. Otherwise, the parts have - * equal size. - * - * @param arr Array to split. - * @return Split result: a collection of 2 arrays. - */ - private Iterable<int[]> splitArray(int[] arr) { - int len1 = arr.length / 2; - int len2 = len1 + arr.length % 2; - - int[] a1 = new int[len1]; - int[] a2 = new int[len2]; - - System.arraycopy(arr, 0, a1, 0, len1); - System.arraycopy(arr, len1, a2, 0, len2); - - return Arrays.asList(a1, a2); - } - - /** - * Performs a merge of 2 arrays. This method runs the element-by-element - * comparison of specified arrays and stacks the least elements into a - * resulting array. - * - * @param arr1 First array. - * @param arr2 Second array. - * @return The merged array, in which any element from the first half is less or equal - * than any element from the second half. - */ - private int[] mergeArrays(int[] arr1, int[] arr2) { - int[] ret = new int[arr1.length + arr2.length]; - - int i1 = 0; - int i2 = 0; - - // Merge 2 arrays into a resulting array - for (int i = 0; i < ret.length; i++) { - if (i1 >= arr1.length) { - System.arraycopy(arr2, i2, ret, i, arr2.length - i2); // Copy the remainder of an array. - - break; - } - else if (i2 >= arr2.length) { - System.arraycopy(arr1, i1, ret, i, arr1.length - i1); // Copy the remainder of an array. - - break; - } - else - ret[i] = arr1[i1] <= arr2[i2] ? arr1[i1++] : arr2[i2++]; - } - - return ret; - } -} - http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/mergesort/GridMergeSortLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/mergesort/GridMergeSortLoadTest.java b/modules/core/src/test/java/org/gridgain/loadtests/mergesort/GridMergeSortLoadTest.java deleted file mode 100644 index 86eb647..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/mergesort/GridMergeSortLoadTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.mergesort; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.lang.*; -import org.gridgain.testframework.*; - -import java.io.*; -import java.util.*; - -/** - * Merge-sort load test. - */ -public class GridMergeSortLoadTest { - /** Default size of real array to be sorted. */ - private static final int ARR_SIZE = 100000; - - /** A fixed seed that gives equal arrays each run. */ - public static final int FIXED_SEED = 23; - - /** - * Entry point for this test. Generates an input array of random integers, - * starts the grid, and launches the job. The method then waits for job completion. - * - * @param args Program arguments. - * <ul> - * <li> - * <b>1-st argument:</b> absolute or relative path to the configuration - * file for the grid (optional). - * </li> - * <li> - * <b>2-nd argument:</b> size of the generated array (optional, default: {@code 100000}). - * </li> - * <li> - * <b>3-nd argument:</b> size of the generated array for "warm up" (optional, default: {@code 10000}). - * </li> - * </ul> - * @throws IgniteCheckedException In case of error. - * @throws IOException In case of file output error. - */ - public static void main(String[] args) throws IgniteCheckedException, IOException { - GridFileLock fileLock = GridLoadTestUtils.fileLock(); - - fileLock.lock(); - - try { - String outputFileName = args.length >= 1 ? args[0] : null; - - try (Ignite g = G.start(args.length >= 2 ? args[1] : "modules/core/src/test/config/load/merge-sort-base.xml")) { - int arrRealSize = args.length > 1 ? Integer.parseInt(args[1]) : ARR_SIZE; - - int arrWarmupSize = args.length > 2 ? Integer.parseInt(args[2]) : ARR_SIZE; - - X.println("Test is being executed on the gird of size " + g.cluster().nodes().size() + "."); - - X.println("Performing warm up sorting of int[" + arrWarmupSize + "]..."); - - sort(g, arrWarmupSize); - - X.println("Cleaning up after warm-up..."); - - // Run GC on all nodes. - g.compute().broadcast(new GridAbsClosure() { - @Override public void apply() { - System.gc(); - } - }); - - X.println("Performing measured sorting of int[" + arrRealSize + "]..."); - - long execTime = sort(g, arrRealSize); - - if (outputFileName != null) - GridLoadTestUtils.appendLineToFile( - outputFileName, - "%s,%d", - GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date()), - execTime / 1000); - } - } - finally { - fileLock.close(); - } - } - - /** - * Generates a random array and performs merge sort benchmark. - * - * @param g Grid to run sorting on. - * @param size Size of the generated array, which we sort. - * @return Sort execution time in milliseconds. - * @throws IgniteCheckedException If failed. - */ - private static long sort(Ignite g, int size) throws IgniteCheckedException { - int[] bigArr = generateRandomArray(size); - - X.println("Array is generated."); - - long startTime = System.currentTimeMillis(); - - g.compute().execute(new GridMergeSortLoadTask(), bigArr); - - long execTime = System.currentTimeMillis() - startTime; - - X.println("Sorting is finished. Execution time: " + execTime + "ms"); - - return execTime; - } - - /** - * Generates an int array of random elements of specified size. - * - * @param size Size of an array. - * @return The generated array. - */ - private static int[] generateRandomArray(int size) { - int[] ret = new int[size]; - - Random rnd = new Random(FIXED_SEED); - - for (int i = 0; i < ret.length; i++) - ret[i] = rnd.nextInt(100); - - return ret; - } -}