http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java new file mode 100644 index 0000000..42bd8e0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.newnodes; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.thread.*; +import org.gridgain.grid.loadtest.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.config.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Base class for single split on new nodes tests. + */ +@GridCommonTest(group = "Load Test") +public abstract class GridSingleSplitsNewNodesAbstractLoadTest extends GridCommonAbstractTest { + /** + * @param cfg Current configuration. + * @return Configured discovery spi. + */ + protected abstract DiscoverySpi getDiscoverySpi(IgniteConfiguration cfg); + + /** + * @return Discovery spi heartbeat frequency. + */ + protected abstract int getHeartbeatFrequency(); + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCommunicationSpi(new TcpCommunicationSpi()); + + cfg.setDiscoverySpi(getDiscoverySpi(cfg)); + + cfg.setMetricsHistorySize(1000); + + // Set up new executor service because we have 1 per test and thus all + // nodes have the same executor service. As soon as node get stopped + // it stops executor service and may fail active nodes. + cfg.setExecutorService(new IgniteThreadPoolExecutor()); + + ((ThreadPoolExecutor)cfg.getExecutorService()).prestartAllCoreThreads(); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return (getTestDurationInMinutes() + 1) * 60 * 1000; + } + + /** + * @return Time for load test in minutes. + */ + private int getTestDurationInMinutes() { + return Integer.valueOf(GridTestProperties.getProperty("load.test.duration")); + } + + /** + * @return Number of threads for the test. + */ + private int getThreadCount() { + return Integer.valueOf(GridTestProperties.getProperty("load.test.threadnum")); + } + + /** + * @return Number of nodes to start/stop. + */ + protected int getNodeCount() { + return Integer.valueOf(GridTestProperties.getProperty("load.test.nodenum")); + } + + /** + * Load test grid. + * + * @throws Exception If task execution failed. + */ + public void testLoad() throws Exception { + final Ignite ignite = startGrid(getTestGridName()); + + try { + final long end = getTestDurationInMinutes() * 60 * 1000 + System.currentTimeMillis(); + + // Warm up. + ignite.compute().execute(GridSingleSplitNewNodesTestTask.class.getName(), 3); + + info("Load test will be executed for '" + getTestDurationInMinutes() + "' mins."); + info("Thread count: " + getThreadCount()); + + final GridLoadTestStatistics stats = new GridLoadTestStatistics(); + final AtomicInteger gridIdx = new AtomicInteger(0); + + for (int i = 0; i < getNodeCount(); i++) { + new Thread(new Runnable() { + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override public void run() { + try { + while (end - System.currentTimeMillis() > 0 + && !Thread.currentThread().isInterrupted()) { + int idx = gridIdx.incrementAndGet(); + + startGrid(idx); + + Thread.sleep(getHeartbeatFrequency() * 3); + + stopGrid(idx); + + Thread.sleep(getHeartbeatFrequency() * 3); + } + } + catch (Throwable e) { + error("Failed to start new node.", e); + + fail(); + } + } + + }, "grid-notaop-nodes-load-test").start(); + } + + GridTestUtils.runMultiThreaded(new Runnable() { + /** {@inheritDoc} */ + @Override public void run() { + IgniteCompute comp = ignite.compute().enableAsync(); + + while (end - System.currentTimeMillis() > 0 + && !Thread.currentThread().isInterrupted()) { + long start = System.currentTimeMillis(); + + try { + int levels = 3; + + comp.execute(new GridSingleSplitNewNodesTestTask(), levels); + + ComputeTaskFuture<Integer> fut = comp.future(); + + int res = fut.get(); + + if (res != levels) + fail("Received wrong result [expected=" + levels + ", actual=" + res + ']'); + + long taskCnt = + stats.onTaskCompleted(fut, levels, System.currentTimeMillis() - start); + + if (taskCnt % 500 == 0) + info(stats.toString()); + } + catch (Throwable e) { + error("Failed to execute grid task.", e); + + fail(); + } + } + } + }, getThreadCount(), "grid-notaop-load-test"); + info("Final test statistics: " + stats); + } + finally { + G.stop(getTestGridName(), false); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesMulticastLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesMulticastLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesMulticastLoadTest.java new file mode 100644 index 0000000..2b379fb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesMulticastLoadTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.newnodes; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.gridgain.testframework.junits.common.*; + +/** + * + */ +@GridCommonTest(group = "Load Test") +public class GridSingleSplitsNewNodesMulticastLoadTest extends GridSingleSplitsNewNodesAbstractLoadTest { + /** {@inheritDoc} */ + @Override protected DiscoverySpi getDiscoverySpi(IgniteConfiguration cfg) { + DiscoverySpi discoSpi = cfg.getDiscoverySpi(); + + assert discoSpi instanceof TcpDiscoverySpi : "Wrong default SPI implementation."; + + ((TcpDiscoverySpi)discoSpi).setHeartbeatFrequency(getHeartbeatFrequency()); + + return discoSpi; + } + + /** {@inheritDoc} */ + @Override protected int getHeartbeatFrequency() { + return 3000; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/package.html b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/package.html new file mode 100644 index 0000000..135eb1a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains internal tests or test related classes and interfaces. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/redeploy/GridSingleSplitsRedeployLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/redeploy/GridSingleSplitsRedeployLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/redeploy/GridSingleSplitsRedeployLoadTest.java new file mode 100644 index 0000000..1a3982c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/redeploy/GridSingleSplitsRedeployLoadTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.redeploy; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.gridgain.grid.loadtest.*; +import org.apache.ignite.spi.communication.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.config.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.concurrent.*; + +/** + * Single splits redeploy load test. + */ +@GridCommonTest(group = "Load Test") +public class GridSingleSplitsRedeployLoadTest extends GridCommonAbstractTest { + /** Load test task type ID. */ + public static final String TASK_NAME = "org.gridgain.grid.tests.p2p.GridSingleSplitTestTask"; + + /** */ + public GridSingleSplitsRedeployLoadTest() { + super(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + CommunicationSpi commSpi = new TcpCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + DiscoverySpi discoSpi = new TcpDiscoverySpi(); + + cfg.setDiscoverySpi(discoSpi); + + ((ThreadPoolExecutor)cfg.getExecutorService()).prestartAllCoreThreads(); + + cfg.setDeploymentMode(IgniteDeploymentMode.CONTINUOUS); + + return cfg; + } + + /** + * @return Test timeout. + */ + @Override protected long getTestTimeout() { + return (getTestDurationInMinutes() + 1) * 60 * 1000; + } + + /** + * @return Time for load test in minutes. + */ + private int getTestDurationInMinutes() { + return Integer.valueOf(GridTestProperties.getProperty("load.test.duration")); + } + + /** + * @return Number of threads for the test. + */ + private int getThreadCount() { + //return 1; + return Integer.valueOf(GridTestProperties.getProperty("load.test.threadnum")); + } + + /** + * Load test grid. + * + * @throws Exception If task execution failed. + */ + public void testLoad() throws Exception { + final Ignite ignite = G.ignite(getTestGridName()); + + final long end = getTestDurationInMinutes() * 60 * 1000 + System.currentTimeMillis(); + + ignite.compute().localDeployTask(loadTaskClass(), loadTaskClass().getClassLoader()); + + info("Load test will be executed for '" + getTestDurationInMinutes() + "' mins."); + info("Thread count: " + getThreadCount()); + + final GridLoadTestStatistics stats = new GridLoadTestStatistics(); + + new Thread(new Runnable() { + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override public void run() { + try { + while (end - System.currentTimeMillis() > 0) { + Class<? extends ComputeTask<?, ?>> cls = loadTaskClass(); + + // info("Deploying class: " + cls); + + ignite.compute().localDeployTask(cls, cls.getClassLoader()); + + Thread.sleep(1000); + } + } + catch (Exception e) { + error("Failed to deploy grid task.", e); + + fail(); + } + } + + }, "grid-notaop-deploy-load-test").start(); + + + GridTestUtils.runMultiThreaded(new Runnable() { + /** {@inheritDoc} */ + @Override public void run() { + try { + int levels = 3; + + while (end - System.currentTimeMillis() > 0) { + long start = System.currentTimeMillis(); + + // info("Executing task: " + TASK_NAME); + + ComputeTaskFuture<Integer> fut = ignite.compute().execute(TASK_NAME, levels); + + int res = fut.get(); + + if (res != levels) + fail("Received wrong result [expected=" + levels + ", actual=" + res + ']'); + + long taskCnt = stats.onTaskCompleted(fut, levels, System.currentTimeMillis() - start); + + if (taskCnt % 500 == 0) + info(stats.toString()); + } + } + catch (IgniteCheckedException e) { + error("Failed to execute grid task.", e); + + fail(); + } + } + }, getThreadCount(), "grid-notaop-load-test"); + + info("Final test statistics: " + stats); + } + + /** + * @return Loaded task class. + * @throws Exception If failed. + */ + @SuppressWarnings({"unchecked"}) + private Class<? extends ComputeTask<?, ?>> loadTaskClass() throws Exception { + return (Class<? extends ComputeTask<?, ?>>)getExternalClassLoader().loadClass(TASK_NAME); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/redeploy/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/redeploy/package.html b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/redeploy/package.html new file mode 100644 index 0000000..135eb1a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/redeploy/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains internal tests or test related classes and interfaces. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/GridSessionLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/GridSessionLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/GridSessionLoadTest.java new file mode 100644 index 0000000..cf35d4f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/GridSessionLoadTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.session; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.gridgain.grid.loadtest.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.config.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.concurrent.atomic.*; + +/** + * Session load test. + */ +@GridCommonTest(group = "Load Test") +public class GridSessionLoadTest extends GridCommonAbstractTest { + /** */ + public GridSessionLoadTest() { + super(/*start Grid*/true); + } + + /** + * @return Time for load test in minutes. + */ + private int getTestDurationInMinutes() { + return Integer.valueOf(GridTestProperties.getProperty("load.test.duration")); + } + + /** + * @return Number of threads for the test. + */ + private int getThreadCount() { + return Integer.valueOf(GridTestProperties.getProperty("load.test.threadnum")); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return (getTestDurationInMinutes() + 1) * 60 * 1000; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testSessionLoad() throws Exception { + final Ignite ignite = G.ignite(getTestGridName()); + + assert ignite != null; + + ignite.compute().localDeployTask(GridSessionLoadTestTask.class, GridSessionLoadTestTask.class.getClassLoader()); + + final long end = getTestDurationInMinutes() * 60 * 1000 + System.currentTimeMillis(); + + info("Load test will be executed for '" + getTestDurationInMinutes() + "' mins."); + info("Thread count: " + getThreadCount()); + + final GridLoadTestStatistics stats = new GridLoadTestStatistics(); + + final AtomicBoolean failed = new AtomicBoolean(false); + + GridTestUtils.runMultiThreaded(new Runnable() { + /** {@inheritDoc} */ + @Override public void run() { + try { + while (end - System.currentTimeMillis() > 0) { + long start = System.currentTimeMillis(); + + ComputeTaskFuture<?> fut = ignite.compute().withTimeout(10000). + execute(GridSessionLoadTestTask.class.getName(), ignite.cluster().nodes().size()); + + Object res = fut.get(); + + assert (Boolean)res; + + long taskCnt = stats.onTaskCompleted(fut, 1, System.currentTimeMillis() - start); + + if (taskCnt % 500 == 0) + info(stats.toString()); + } + } + catch (Throwable e) { + error("Load test failed.", e); + + failed.set(true); + } + } + }, getThreadCount(), "grid-load-test-thread"); + + info("Final test statistics: " + stats); + + if (failed.get()) + fail(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/GridSessionLoadTestJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/GridSessionLoadTestJob.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/GridSessionLoadTestJob.java new file mode 100644 index 0000000..9d931e1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/GridSessionLoadTestJob.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.session; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; + +import java.io.*; + +/** + * Session load test job. + */ +public class GridSessionLoadTestJob extends ComputeJobAdapter { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession taskSes; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + public GridSessionLoadTestJob() { + // No-op. + } + + /** + * @param arg Argument. + */ + public GridSessionLoadTestJob(String arg) { + super(arg); + } + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + assert taskSes != null; + + Object arg = argument(0); + + assert arg != null; + + Serializable ser = taskSes.getAttribute(arg); + + assert ser != null; + + int val = (Integer)ser + 1; + + // Generate garbage. + for (int i = 0; i < 10; i++) + taskSes.setAttribute(arg, i); + + // Set final value. + taskSes.setAttribute(arg, val); + + if (log.isDebugEnabled()) + log.debug("Set session attribute [name=" + arg + ", value=" + val + ']'); + + return val; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/GridSessionLoadTestTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/GridSessionLoadTestTask.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/GridSessionLoadTestTask.java new file mode 100644 index 0000000..6f7cff4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/GridSessionLoadTestTask.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.session; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; + +import java.util.*; + +/** + * Session load test task. + */ +public class GridSessionLoadTestTask extends ComputeTaskAdapter<Integer, Boolean> { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession taskSes; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private Map<String, Integer> params; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Integer arg) throws IgniteCheckedException { + assert taskSes != null; + assert arg != null; + assert arg > 0; + + Map<GridSessionLoadTestJob, ClusterNode> map = new HashMap<>(subgrid.size()); + + Iterator<ClusterNode> iter = subgrid.iterator(); + + Random rnd = new Random(); + + params = new HashMap<>(arg); + + Collection<UUID> assigned = new ArrayList<>(subgrid.size()); + + for (int i = 0; i < arg; i++) { + // Recycle iterator. + if (!iter.hasNext()) + iter = subgrid.iterator(); + + String paramName = UUID.randomUUID().toString(); + + int paramVal = rnd.nextInt(); + + taskSes.setAttribute(paramName, paramVal); + + ClusterNode node = iter.next(); + + assigned.add(node.id()); + + map.put(new GridSessionLoadTestJob(paramName), node); + + params.put(paramName, paramVal); + + if (log.isDebugEnabled()) + log.debug("Set session attribute [name=" + paramName + ", value=" + paramVal + ']'); + } + + taskSes.setAttribute("nodes", assigned); + + return map; + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert taskSes != null; + assert results != null; + assert params != null; + assert !params.isEmpty(); + assert results.size() == params.size(); + + Map<String, Integer> receivedParams = new HashMap<>(); + + boolean allAttrReceived = false; + + int cnt = 0; + + while (!allAttrReceived && cnt++ < 3) { + allAttrReceived = true; + + for (Map.Entry<String, Integer> entry : params.entrySet()) { + assert taskSes.getAttribute(entry.getKey()) != null; + + Integer newVal = (Integer)taskSes.getAttribute(entry.getKey()); + + assert newVal != null; + + receivedParams.put(entry.getKey(), newVal); + + if (newVal != entry.getValue() + 1) + allAttrReceived = false; + } + + if (!allAttrReceived) { + try { + Thread.sleep(100); + } + catch (InterruptedException e) { + throw new IgniteCheckedException("Thread interrupted.", e); + } + } + } + + if (log.isDebugEnabled()) { + for (Map.Entry<String, Integer> entry : receivedParams.entrySet()) { + log.debug("Received session attr value [name=" + entry.getKey() + ", val=" + entry.getValue() + + ", expected=" + (params.get(entry.getKey()) + 1) + ']'); + } + } + + return allAttrReceived; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/package.html b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/package.html new file mode 100644 index 0000000..135eb1a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/session/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains internal tests or test related classes and interfaces. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/GridStealingLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/GridStealingLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/GridStealingLoadTest.java new file mode 100644 index 0000000..563d1a4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/GridStealingLoadTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.stealing; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.gridgain.grid.loadtest.*; +import org.apache.ignite.spi.collision.jobstealing.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.failover.jobstealing.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.config.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +@GridCommonTest(group = "Load Test") +public class GridStealingLoadTest extends GridCommonAbstractTest { + /** */ + public GridStealingLoadTest() { + super(false); + } + + /** + * @return Number of threads for the test. + */ + private int getThreadCount() { + return Integer.valueOf(GridTestProperties.getProperty("load.test.threadnum")); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + JobStealingCollisionSpi colSpi = new JobStealingCollisionSpi(); + + assert colSpi.getActiveJobsThreshold() == JobStealingCollisionSpi.DFLT_ACTIVE_JOBS_THRESHOLD; + assert colSpi.getWaitJobsThreshold() == JobStealingCollisionSpi.DFLT_WAIT_JOBS_THRESHOLD; + + // One job at a time. + colSpi.setActiveJobsThreshold(5); + colSpi.setWaitJobsThreshold(0); + colSpi.setMessageExpireTime(5000); + + JobStealingFailoverSpi failSpi = new JobStealingFailoverSpi(); + + // Verify defaults. + assert failSpi.getMaximumFailoverAttempts() == JobStealingFailoverSpi.DFLT_MAX_FAILOVER_ATTEMPTS; + + DiscoverySpi discoSpi = new TcpDiscoverySpi(); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCollisionSpi(colSpi); + cfg.setFailoverSpi(failSpi); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testStealingLoad() throws Exception { + final Ignite ignite = grid(0); + + assert ignite != null; + + assert !ignite.cluster().forRemotes().nodes().isEmpty() : "Test requires at least 2 nodes."; + + final UUID stealingNodeId = ignite.cluster().forRemotes().nodes().iterator().next().id(); + + info("Set stealing node id to: " + stealingNodeId); + + ignite.compute().localDeployTask(GridStealingLoadTestTask.class, GridStealingLoadTestTask.class.getClassLoader()); + + final long end = 2 * 60 * 1000 + System.currentTimeMillis(); + + info("Test timeout: " + getTestTimeout() + " ms."); + info("Thread count: " + getThreadCount()); + + final GridLoadTestStatistics stats = new GridLoadTestStatistics(); + + final AtomicBoolean failed = new AtomicBoolean(false); + + final AtomicInteger stolen = new AtomicInteger(0); + + GridTestUtils.runMultiThreaded(new Runnable() { + /** {@inheritDoc} */ + @Override public void run() { + try { + while (end - System.currentTimeMillis() > 0) { + long start = System.currentTimeMillis(); + + // Pass stealing node id. + ComputeTaskFuture<?> fut = ignite.compute().withTimeout(20000). + execute(GridStealingLoadTestTask.class.getName(), stealingNodeId); + + stolen.addAndGet((Integer)fut.get()); + + long taskCnt = stats.onTaskCompleted(fut, 1, System.currentTimeMillis() - start); + + if (taskCnt % 500 == 0) + info("Stats [stats=" + stats.toString() + ", stolen=" + stolen + ']'); + } + } + catch (Throwable e) { + error("Load test failed.", e); + + failed.set(true); + } + } + }, getThreadCount(), "grid-load-test-thread"); + + info("Final test statistics: " + stats); + + if (failed.get()) + fail(); + + assert stolen.get() != 0: "No jobs were stolen by stealing node."; + + info("Stolen jobs: " + stolen.get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/GridStealingLoadTestJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/GridStealingLoadTestJob.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/GridStealingLoadTestJob.java new file mode 100644 index 0000000..41f9a65 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/GridStealingLoadTestJob.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.stealing; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; + +import java.io.*; +import java.util.*; + +/** + * Stealing load test. + */ +public class GridStealingLoadTestJob extends ComputeJobAdapter { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteJobContextResource + private ComputeJobContext ctx; + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + UUID nodeId = ignite.configuration().getNodeId(); + + if (log.isDebugEnabled()) + log.debug("Executing job on node [nodeId=" + nodeId + ", jobId=" + ctx.getJobId() + ']'); + + try { + Thread.sleep(500); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + + // Here we gonna return node id which executed this job. + // Hopefully it would be stealing node. + return nodeId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/GridStealingLoadTestTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/GridStealingLoadTestTask.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/GridStealingLoadTestTask.java new file mode 100644 index 0000000..76682f8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/GridStealingLoadTestTask.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.direct.stealing; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; + +import java.util.*; + +/** + * Stealing load test task. + */ +public class GridStealingLoadTestTask extends ComputeTaskAdapter<UUID, Integer> { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession taskSes; + + /** */ + private UUID stealingNodeId; + + /** */ + private int stolenJobs; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, UUID arg) throws IgniteCheckedException { + assert arg != null; + assert subgrid.size() > 1: "Test requires at least 2 nodes. One with load and another one to steal."; + + int jobsNum = subgrid.size(); + + Map<GridStealingLoadTestJob, ClusterNode> map = new HashMap<>(jobsNum); + + stealingNodeId = arg; + + Iterator<ClusterNode> iter = subgrid.iterator(); + + Collection<UUID> assigned = new ArrayList<>(subgrid.size()); + + for (int i = 0; i < jobsNum; i++) { + ClusterNode node = null; + + boolean nextNodeFound = false; + + while (iter.hasNext() && !nextNodeFound) { + node = iter.next(); + + // Do not map jobs to the stealing node. + if (!node.id().equals(stealingNodeId)) + nextNodeFound = true; + + // Recycle iterator. + if (!iter.hasNext()) + iter = subgrid.iterator(); + } + + assert node != null; + + assigned.add(node.id()); + + map.put(new GridStealingLoadTestJob(), node); + } + + taskSes.setAttribute("nodes", assigned); + + return map; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results != null; + + for (ComputeJobResult res : results) { + if (res.getData() != null && stealingNodeId.equals(res.getData())) + stolenJobs++; + } + + return stolenJobs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/package.html b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/package.html new file mode 100644 index 0000000..135eb1a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/stealing/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains internal tests or test related classes and interfaces. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java new file mode 100644 index 0000000..f84b1ed --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.discovery; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * + */ +public class GridGcTimeoutTest { + /** */ + public static final String CFG_PATH = "modules/core/src/test/config/discovery-stress.xml"; + + /** */ + public static final int VALUE_SIZE = 1024; + + /** + * @param args Args. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("InfiniteLoopStatement") + public static void main(String[] args) throws IgniteCheckedException { + Ignite g = G.start(U.resolveGridGainUrl(CFG_PATH)); + + IgniteDataLoader<Long, String> ldr = g.dataLoader(null); + + ldr.perNodeBufferSize(16 * 1024); + + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < VALUE_SIZE - 42; i++) + sb.append('a'); + + String str = sb.toString(); + long cntr = 0; + + while (true) { + ldr.addData(cntr++, UUID.randomUUID() + str); + + if (cntr % 1000000 == 0) + X.println("!!! Entries added: " + cntr); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java new file mode 100644 index 0000000..7cb1387 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.dsi; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class GridDsiClient implements Callable { + /** Stats update interval in seconds. */ + private static final int UPDATE_INTERVAL_SEC = 10; + + /** Grid. */ + private static Ignite g; + + /** Transaction count. */ + private static AtomicLong txCnt = new AtomicLong(); + + /** Latency. */ + private static AtomicLong latency = new AtomicLong(); + + /** Submit time. */ + private static GridAtomicLong submitTime = new GridAtomicLong(); + + /** Server stats. */ + private static volatile T3<Long, Integer, Integer> srvStats; + + /** Finish flag. */ + private static AtomicBoolean finish = new AtomicBoolean(); + + /** Terminal ID. */ + private String terminalId; + + /** Node ID. */ + private UUID nodeId; + + /** + * Client constructor. + * + * @param terminalId Terminal ID. + * @param nodeId Node ID. + */ + GridDsiClient(String terminalId, UUID nodeId) { + this.terminalId = terminalId; + this.nodeId = nodeId; + } + + /** + * Predicate to look for server node. + * + * @return {@code true} if node segment is 'server'. + */ + public static IgnitePredicate<ClusterNode> serverNode() { + return new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return "server".equals(node.attribute("segment")); + } + }; + } + + /** + * Predicate to look for client node. + * + * @return {@code true} if node segment is 'client'. + */ + public static IgnitePredicate<ClusterNode> clientNode() { + return new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return "client".equals(node.attribute("segment")); + } + }; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "InfiniteLoopStatement"}) + @Nullable @Override public Object call() throws Exception { + IgniteCompute comp = g.compute(g.cluster().forPredicate(serverNode())).enableAsync(); + + while (!finish.get()) { + try { + long t0 = System.currentTimeMillis(); + + long submitTime1 = t0; + + comp.execute(GridDsiRequestTask.class, new GridDsiMessage(terminalId, nodeId)); + + ComputeTaskFuture<T3<Long, Integer, Integer>> f1 = comp.future(); + + submitTime.setIfGreater(System.currentTimeMillis() - submitTime1); + + T3<Long, Integer, Integer> res1 = f1.get(); + + submitTime1 = System.currentTimeMillis(); + + comp.execute(GridDsiResponseTask.class, new GridDsiMessage(terminalId, nodeId)); + + ComputeTaskFuture<T3<Long, Integer, Integer>> f2 = comp.future(); + + submitTime.setIfGreater(System.currentTimeMillis() - submitTime1); + + T3<Long, Integer, Integer> res2 = f2.get(); + + long t1 = System.currentTimeMillis(); + + txCnt.incrementAndGet(); + + latency.addAndGet(t1 - t0); + + if (res1 != null) + srvStats = res1; + + if (res2 != null) + srvStats = res2; + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + } + + return null; + } + + /** + * Method to print request statistics. + */ + private static void displayReqCount() { + new Thread(new Runnable() { + @SuppressWarnings({"BusyWait", "InfiniteLoopStatement"}) + @Override public void run() { + int interval = 30; + + while (true) { + long cnt0 = txCnt.get(); + long lt0 = latency.get(); + + try { + Thread.sleep(interval * 1000); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + + long cnt1 = txCnt.get(); + long lt1 = latency.get(); + + X.println(">>>"); + X.println(">>> Transaction/s: " + (cnt1 - cnt0) / interval); + X.println( + ">>> Avg Latency: " + ((cnt1 - cnt0) > 0 ? (lt1 - lt0) / (cnt1 - cnt0) + "ms" : "invalid")); + X.println(">>> Max Submit Time: " + submitTime.getAndSet(0)); + } + } + }).start(); + } + + /** + * Execute DSI load client. + * + * @param args Command line arguments, two required - first one is the number of threads, + * second one should point to the Spring XML configuration file. + * @throws Exception If client fails. + */ + @SuppressWarnings("unchecked") + public static void main(String[] args) throws Exception { + GridFileLock fileLock = GridLoadTestUtils.fileLock(); + + fileLock.lock(true); // Get shared lock, allowing multiple instances. + + try { + Ignition.start(args.length < 4 ? "modules/core/src/test/config/load/dsi-load-client.xml" : args[3]); + + Thread collector = null; + + Thread timer = null; + + try { + g = Ignition.ignite("dsi"); + + int noThreads = Integer.parseInt(args[0]); + + final int duration = args.length < 2 ? 0 : Integer.parseInt(args[1]); + + final String outputFileName = args.length < 3 ? null : args[2]; + + X.println("Thread count: " + noThreads); + + Collection<ClusterNode> srvNodes = g.cluster().forPredicate(serverNode()).nodes(); + + if (srvNodes.isEmpty()) { + X.println("No server nodes available"); + + System.exit(-1); + } + + X.println("No of servers: " + srvNodes.size()); + + int srvMaxNoTerminals = noThreads / srvNodes.size(); + + if (srvMaxNoTerminals * srvNodes.size() != noThreads) { + noThreads = srvMaxNoTerminals * srvNodes.size(); + + X.println("Using " + noThreads + " threads instead to ensure equal distribution of terminals"); + } + + Collection<Callable<Object>> clients = new ArrayList<>(noThreads); + + // No 2 client should use the same simulator. + HashMap<UUID, Collection<String>> terminals = (HashMap<UUID, Collection<String>>) + g.cache("CLIENT_PARTITIONED_CACHE").get("terminals"); + + if (terminals == null) { + X.println(">>> Terminals map has not been initialized."); + + terminals = new HashMap<>(srvNodes.size()); + + // Distribute terminals evenly across all servers. + for (ClusterNode node : srvNodes) { + UUID srvrId = node.id(); + + X.println(">>> Node ID: " + srvrId); + + Collection<String> list = terminals.get(srvrId); + + if (list == null) + list = new ArrayList<>(0); + + int terminalsPerSrv = 0; + + int tid = 0; // Terminal ID. + + while (true) { + String terminalId = String.valueOf(++tid); + + // Server partition cache. + if (!srvrId.equals(g.cluster().mapKeyToNode("PARTITIONED_CACHE", terminalId).id())) + continue; + + if (terminalsPerSrv < srvMaxNoTerminals) { + list.add(terminalId); + + clients.add(new GridDsiClient(terminalId, srvrId)); + + terminalsPerSrv++; + + X.println("Terminal ID: " + terminalId); + } + else + break; + } + + terminals.put(srvrId, list); + } + + g.cache("CLIENT_PARTITIONED_CACHE").putx("terminals", terminals); + } + else { + X.println(">>> Terminals map has been initialized."); + + for (Map.Entry<UUID, Collection<String>> e : terminals.entrySet()) { + X.println(">>> Node ID: " + e.getKey()); + + for (String s : e.getValue()) { + clients.add(new GridDsiClient(s, e.getKey())); + + X.println("Terminal ID: " + s); + } + } + } + + if (duration > 0) { + timer = new Thread(new Runnable() { + @Override public void run() { + try { + Thread.sleep(duration * 1000); + + finish.set(true); + } + catch (InterruptedException ignored) { + // No-op. + } + } + }); + timer.start(); + } + + collector = new Thread(new Runnable() { + @SuppressWarnings({"BusyWait", "InfiniteLoopStatement"}) + @Override public void run() { + long txPerSecond = -1; + long avgLatency = -1; + long maxSubmitTime = -1; + T3<Long, Integer, Integer> sst = null; + + try { + while (!finish.get()) { + long cnt0 = txCnt.get(); + long lt0 = latency.get(); + + Thread.sleep(UPDATE_INTERVAL_SEC * 1000); + + long cnt1 = txCnt.get(); + long lt1 = latency.get(); + + X.println(">>>"); + + txPerSecond = (cnt1 - cnt0) / UPDATE_INTERVAL_SEC; + X.println(">>> Transaction/s: " + txPerSecond); + + avgLatency = (cnt1 - cnt0) > 0 ? (lt1 - lt0) / (cnt1 - cnt0) : -1; + X.println( + ">>> Avg Latency: " + (avgLatency >= 0 ? avgLatency + "ms" : "invalid")); + + maxSubmitTime = submitTime.getAndSet(0); + X.println(">>> Max Submit Time: " + maxSubmitTime); + + sst = srvStats; + + if (sst != null) + X.println(String.format(">>> Server stats: [tx/sec=%d, nearSize=%d, dhtSize=%d]", + sst.get1(), sst.get2(), sst.get3())); + } + } + catch (InterruptedException ignored) { + X.println(">>> Interrupted."); + + Thread.currentThread().interrupt(); + } + + // Output data to a file, if specified. + if (outputFileName != null) { + X.println("Writing client results to a file: " + outputFileName); + + try { + GridLoadTestUtils.appendLineToFile( + outputFileName, + "%s,%d,%d,%d", + GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date()), + txPerSecond, + avgLatency, + maxSubmitTime); + } + catch (IOException e) { + X.println("Failed to write client results: ", e); + } + + if (sst != null) { + String srvOutputFileName = outputFileName + "-server"; + + X.println("Writing server results to a file: " + srvOutputFileName); + + try { + GridLoadTestUtils.appendLineToFile( + srvOutputFileName, + "%s,%d,%d,%d", + GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date()), + sst.get1(), + sst.get2(), + sst.get3()); + } + catch (IOException e) { + X.println("Failed to write server results: ", e); + } + } + } + } + }); + collector.start(); + + ExecutorService pool = Executors.newFixedThreadPool(noThreads); + + pool.invokeAll(clients); + + collector.interrupt(); + + pool.shutdown(); + } + finally { + if (collector != null && !collector.isInterrupted()) + collector.interrupt(); + + if (timer != null) + timer.interrupt(); + + Ignition.stopAll(true); + } + } + finally { + fileLock.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiLifecycleBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiLifecycleBean.java b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiLifecycleBean.java new file mode 100644 index 0000000..b12c94d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiLifecycleBean.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.dsi; + +import org.apache.ignite.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.resources.*; +import org.springframework.context.*; + +/** + * + */ +public class GridDsiLifecycleBean implements LifecycleBean { + /** + * Ignite instance will be automatically injected. For additional resources + * that can be injected into lifecycle beans see + * {@link org.apache.ignite.lifecycle.LifecycleBean} documentation. + */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @SuppressWarnings("UnusedDeclaration") + @IgniteSpringApplicationContextResource + private ApplicationContext springCtx; + + /** {@inheritDoc} */ + @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException { + switch (evt) { + case BEFORE_GRID_START: + break; + + case AFTER_GRID_START: + ignite.cache("PARTITIONED_CACHE").dataStructures().atomicSequence("ID", 0, true); + break; + + case BEFORE_GRID_STOP: + break; + + case AFTER_GRID_STOP: + break; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiMessage.java b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiMessage.java new file mode 100644 index 0000000..2f6bb2a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiMessage.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.dsi; + +import java.io.*; +import java.util.*; + +/** + * + */ +public class GridDsiMessage implements Serializable { + /** Terminal ID. */ + private String terminalId; + + /** Node ID. */ + private UUID nodeId; + + /** + * Message constructor. + * + * @param terminalId Terminal ID. + * @param nodeId Node ID. + */ + public GridDsiMessage(String terminalId, UUID nodeId) { + this.terminalId = terminalId; + this.nodeId = nodeId; + } + + /** + * @return Terminal ID. + */ + public String getTerminalId() { + return terminalId; + } + + /** + * Sets terminal ID. + * @param terminalId Terminal ID. + */ + public void setTerminalId(String terminalId) { + this.terminalId = terminalId; + } + + /** + * @return Node ID. + */ + public UUID getNodeId() { + return nodeId; + } + + /** + * Sets node ID. + * + * @param nodeId Node ID. + */ + public void setNodeId(UUID nodeId) { + this.nodeId = nodeId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiPerfJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiPerfJob.java b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiPerfJob.java new file mode 100644 index 0000000..a2ec40c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiPerfJob.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.dsi; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.datastructures.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class GridDsiPerfJob extends ComputeJobAdapter { + /** */ + private static final ConcurrentMap<Thread, ConcurrentMap<String, T3<Long, Long, Long>>> timers = + new ConcurrentHashMap8<>(); + + /** */ + private static final long PRINT_FREQ = 10000; + + /** */ + private static final GridAtomicLong lastPrint = new GridAtomicLong(); + + /** */ + private static final long MAX = 5000; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @GridCacheName + private String cacheName = "PARTITIONED_CACHE"; + + /** + * @param msg Message. + */ + public GridDsiPerfJob(@Nullable GridDsiMessage msg) { + super(msg); + } + + /** + * @return Message. + */ + @Nullable private GridDsiMessage message() { + return argument(0); + } + + /** + * @return Terminal ID. + */ + @GridCacheAffinityKeyMapped + @Nullable public String terminalId() { + GridDsiMessage msg = message(); + + return msg != null ? msg.getTerminalId() : null; + } + + /** + * @return Result. + */ + @SuppressWarnings("ConstantConditions") + @Override public Object execute() { + ClusterNodeLocalMap<String, T2<AtomicLong, AtomicLong>> nodeLoc = ignite.cluster().nodeLocalMap(); + + T2<AtomicLong, AtomicLong> cntrs = nodeLoc.get("cntrs"); + + if (cntrs == null) { + T2<AtomicLong, AtomicLong> other = nodeLoc.putIfAbsent("cntrs", + cntrs = new T2<>(new AtomicLong(), new AtomicLong(System.currentTimeMillis()))); + + if (other != null) + cntrs = other; + } + + long cnt = cntrs.get1().incrementAndGet(); + + GridNearCacheAdapter near = (GridNearCacheAdapter)((GridKernal) ignite).internalCache(cacheName); + GridDhtCacheAdapter dht = near.dht(); + + doWork(); + + long start = cntrs.get2().get(); + + long now = System.currentTimeMillis(); + + long dur = now - start; + + if (dur > 20000 && cntrs.get2().compareAndSet(start, System.currentTimeMillis())) { + cntrs.get1().set(0); + + long txPerSec = cnt / (dur / 1000); + + X.println("Stats [tx/sec=" + txPerSec + ", nearSize=" + near.size() + ", dhtSize=" + dht.size() + ']'); + + return new T3<>(txPerSec, near.size(), dht.size()); + } + + return null; + } + + /** + * @param name Timer name to start. + */ + private void startTimer(String name) { + ConcurrentMap<String, T3<Long, Long, Long>> m = timers.get(Thread.currentThread()); + + if (m == null) { + ConcurrentMap<String, T3<Long, Long, Long>> old = timers.putIfAbsent(Thread.currentThread(), + m = new ConcurrentHashMap8<>()); + + if (old != null) + m = old; + } + + T3<Long, Long, Long> t = m.get(name); + + if (t == null) { + T3<Long, Long, Long> old = m.putIfAbsent(name, t = new T3<>()); + + if (old != null) + t = old; + } + + t.set1(System.currentTimeMillis()); + t.set2(0L); + } + + /** + * @param name Timer name to stop. + */ + @SuppressWarnings("ConstantConditions") + private void stopTimer(String name) { + ConcurrentMap<String, T3<Long, Long, Long>> m = timers.get(Thread.currentThread()); + + T3<Long, Long, Long> t = m.get(name); + + assert t != null; + + long now = System.currentTimeMillis(); + + t.set2(now); + + t.set3(Math.max(t.get3() == null ? 0 : t.get3(), now - t.get1())); + } + + /** + * + */ + private void printTimers() { + long now = System.currentTimeMillis(); + + if (lastPrint.get() + PRINT_FREQ < now && lastPrint.setIfGreater(now)) { + Map<String, Long> maxes = new HashMap<>(); + + for (Map.Entry<Thread, ConcurrentMap<String, T3<Long, Long, Long>>> e1 : timers.entrySet()) { + for (Map.Entry<String, T3<Long, Long, Long>> e2 : e1.getValue().entrySet()) { + T3<Long, Long, Long> t = e2.getValue(); + + Long start = t.get1(); + Long end = t.get2(); + + assert start != null; + assert end != null; + + long duration = end == 0 ? now - start : end - start; + + long max = t.get3() == null ? duration : t.get3(); + + if (duration < 0) + duration = now - start; + + if (duration > MAX) + X.println("Maxed out timer [name=" + e2.getKey() + ", duration=" + duration + + ", ongoing=" + (end == 0) + ", thread=" + e1.getKey().getName() + ']'); + + Long cmax = maxes.get(e2.getKey()); + + if (cmax == null || max > cmax) + maxes.put(e2.getKey(), max); + + t.set3(null); + } + } + + for (Map.Entry<String, Long> e : maxes.entrySet()) + X.println("Timer [name=" + e.getKey() + ", maxTime=" + e.getValue() + ']'); + + X.println(">>>>"); + } + } + + /** + * + */ + private void doWork() { + GridCache cache = ignite.cache(cacheName); + + assert cache != null; + + // This is instead of former code to find request + // with some ID. + try { + getId(); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + + startTimer("getSession"); + + String terminalId = terminalId(); + + assert terminalId != null; + + GridDsiSession ses = null; + + try { + ses = (GridDsiSession)get(GridDsiSession.getCacheKey(terminalId)); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + + stopTimer("getSession"); + + if (ses == null) + ses = new GridDsiSession(terminalId); + + try { + try (IgniteTx tx = cache.txStart()) { + GridDsiRequest req = new GridDsiRequest(getId()); + + req.setMessageId(getId()); + + startTimer("putRequest"); + + put(req, req.getCacheKey(terminalId)); + + stopTimer("putRequest"); + + for (int i = 0; i < 5; i++) { + GridDsiResponse rsp = new GridDsiResponse(getId()); + + startTimer("putResponse-" + i); + + put(rsp, rsp.getCacheKey(terminalId)); + + stopTimer("putResponse-" + i); + } + + startTimer("putSession"); + + put(ses, ses.getCacheKey()); + + stopTimer("putSession"); + + startTimer("commit"); + + tx.commit(); + + stopTimer("commit"); + } + } + catch (Exception e) { + e.printStackTrace(); + } + + printTimers(); + } + + /** + * @return ID. + * @throws IgniteCheckedException If failed. + */ + private long getId() throws IgniteCheckedException { + GridCache<Object, Object> cache = ignite.cache(cacheName); + + assert cache != null; + + GridCacheAtomicSequence seq = cache.dataStructures().atomicSequence("ID", 0, true); + + return seq.incrementAndGet(); + } + + /** + * @param o Object. + * @param cacheKey Key. + * @throws IgniteCheckedException If failed. + */ + private void put(Object o, Object cacheKey) throws IgniteCheckedException { + GridCache<Object, Object> cache = ignite.cache(cacheName); + + assert cache != null; + + GridCacheEntry<Object, Object> entry = cache.entry(cacheKey); + + if (entry != null) + entry.setx(o); + } + + /** + * @param key Key. + * @return Object. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("ConstantConditions") + private <T> Object get(Object key) throws IgniteCheckedException { + return ignite.cache(cacheName).get(key); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiRequest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiRequest.java new file mode 100644 index 0000000..824949d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiRequest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.dsi; + +import org.apache.ignite.cache.affinity.*; + +import java.io.*; + +/** + * + */ +public class GridDsiRequest implements Serializable { + /** */ + private Long id; + + /** */ + @SuppressWarnings({"UnusedDeclaration", "FieldCanBeLocal"}) + private long msgId; + + /** */ + @SuppressWarnings("UnusedDeclaration") + private long txId; + + /** + * @param id ID. + */ + public GridDsiRequest(long id) { + this.id = id; + } + + /** + * @param msgId Message ID. + */ + public void setMessageId(long msgId) { + this.msgId = msgId; + } + + /** + * @param terminalId Terminal ID. + * @return Cache key. + */ + public Object getCacheKey(String terminalId){ + return new RequestKey(id, terminalId); + } + + /** + * + */ + @SuppressWarnings("PackageVisibleInnerClass") + static class RequestKey implements Serializable { + /** */ + private Long key; + + /** */ + @SuppressWarnings("UnusedDeclaration") + @GridCacheAffinityKeyMapped + private String terminalId; + + /** + * @param key Key. + * @param terminalId Terminal ID. + */ + RequestKey(long key, String terminalId) { + this.key = key; + this.terminalId = terminalId; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof RequestKey && key.equals(((RequestKey)obj).key); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiRequestTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiRequestTask.java b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiRequestTask.java new file mode 100644 index 0000000..7375ca7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiRequestTask.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.dsi; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; + +/** + * + */ +public class GridDsiRequestTask extends ComputeTaskSplitAdapter<GridDsiMessage, T3<Long, Integer, Integer>> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int arg0, GridDsiMessage msg) throws IgniteCheckedException { + return Collections.singletonList(new GridDsiPerfJob(msg)); + } + + /** {@inheritDoc} */ + @Override public T3<Long, Integer, Integer> reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == 1; + + return results.get(0).getData(); + } +}