http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java new file mode 100644 index 0000000..aa2d9d4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.job; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Runnable with continuous task submission and result checking. + */ +public class GridJobLoadTestSubmitter implements Runnable { + /** */ + public static final int TIMEOUT = 120000; + + /** Grid where all tasks should be submitted. */ + private final Ignite ignite; + + /** Params of simulated jobs. */ + private final GridJobLoadTestParams params; + + /** Time to sleep between task submissions. */ + private final long submitDelay; + + /** Submission/cancel ratio. */ + private final int cancelRate; + + /** List of futures for submitted tasks. */ + private final List<ComputeTaskFuture<Integer>> futures = new LinkedList<>(); + + /** Counter to implement fixed submit/cancel ratio. */ + private int iteration; + + /** + * @param ignite Grid where all tasks should be submitted. + * @param params Params of simulated jobs. + * @param cancelRate Submission/cancel ratio. + * @param submitDelay Time to sleep between task submissions. + */ + public GridJobLoadTestSubmitter(Ignite ignite, GridJobLoadTestParams params, int cancelRate, long submitDelay) { + this.ignite = ignite; + this.params = params; + this.cancelRate = cancelRate; + this.submitDelay = submitDelay; + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override public void run() { + IgniteCompute comp = ignite.compute().enableAsync(); + + while (true) { + checkCompletion(); + + performCancel(); + + try { + Thread.sleep(submitDelay); + } + catch (InterruptedException ignored) { + return; + } + + try { + comp.withTimeout(TIMEOUT).execute(GridJobLoadTestTask.class, params); + + futures.add(comp.<Integer>future()); + } + catch (IgniteCheckedException e) { + // Should not be thrown since uses asynchronous execution. + throw new IgniteException(e); + } + } + } + + /** + * Finds completed tasks in the queue and removes them. + */ + private void checkCompletion() { + for (Iterator<ComputeTaskFuture<Integer>> iter = futures.iterator(); iter.hasNext();) { + ComputeTaskFuture<Integer> fut = iter.next(); + + if (fut.isDone()) { + try { + Integer res = fut.get(); + + assert res == params.getJobsCount() : + "Task returned wrong result [taskIs=" + fut.getTaskSession().getId() + ", result=" + res + "]"; + + ignite.log().info(">>> Task completed successfully. Task id: " + fut.getTaskSession().getId()); + } + catch (IgniteFutureCancelledException ignored) { + ignite.log().info(">>> Task cancelled: " + fut.getTaskSession().getId()); + } + catch (IgniteCheckedException e) { + ignite.log().warning( + ">>> Get operation for completed task failed: " + fut.getTaskSession().getId(), e); + } + finally { + iter.remove(); + } + } + } + } + + /** + * Cancel a random task when required. + */ + private void performCancel() { + iteration++; + + if (iteration % cancelRate == 0) { + // Here we should have mostly running futures so just pick one. + ComputeTaskFuture<Integer> futToCancel = futures.get( new Random().nextInt(futures.size()) ); + + try { + futToCancel.cancel(); + ignite.log().info("Task canceled: " + futToCancel.getTaskSession().getId()); + } + catch (IgniteCheckedException e) { + ignite.log().warning(">>> Future cancellation failed: " + futToCancel.getTaskSession().getId(), e); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestTask.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestTask.java new file mode 100644 index 0000000..9745c5f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestTask.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.job; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Test task for {@link GridJobLoadTest} + */ +public class GridJobLoadTestTask extends ComputeTaskAdapter<GridJobLoadTestParams, Integer> { + /**{@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable GridJobLoadTestParams arg) + throws IgniteCheckedException { + assert !subgrid.isEmpty(); + + Map<ComputeJob, ClusterNode> jobs = new HashMap<>(); + + for (int i = 0; i < arg.getJobsCount(); i++) + jobs.put( + new GridJobLoadTestJob( + /*only on the first step*/i == 0, + arg.getJobFailureProbability(), + arg.getExecutionDuration(), + arg.getCompletionDelay()), + subgrid.get(0)); + + return jobs; + } + + /** + * Always trying to failover job, except failed assertions. + * + * {@inheritDoc} + */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { + return res.getException() == null ? WAIT : + res.getException().getCause() instanceof AssertionError ? REDUCE : FAILOVER; + } + + /**{@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + int sum = 0; + + for (ComputeJobResult r: results) { + if (!r.isCancelled() && r.getException() == null) + sum += r.<Integer>getData(); + } + + return sum; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/job/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/package.html b/modules/core/src/test/java/org/apache/ignite/loadtests/job/package.html new file mode 100644 index 0000000..e291932 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> +<!-- Package description. --> +Contains internal tests or test related classes and interfaces. +</body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/lang/GridConcurrentLinkedHashMapBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/lang/GridConcurrentLinkedHashMapBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/lang/GridConcurrentLinkedHashMapBenchmark.java new file mode 100644 index 0000000..7e034ee --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/lang/GridConcurrentLinkedHashMapBenchmark.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.lang; + +import org.apache.ignite.internal.util.typedef.*; +import org.jdk8.backport.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Benchmark for different accessors in {@link ConcurrentLinkedHashMap}. + */ +public class GridConcurrentLinkedHashMapBenchmark { + /** Number of keys to use in benchmark. */ + private static final int KEY_RANGE = 1000; + + /** Amount of writes from total number of iterations. */ + private static final double WRITE_RATE = 0.2; + + /** + * @param args Command-line arguments. + */ + public static void main(String[] args) { + System.out.printf("%8s, %8s, %12s, %12s, %12s, %8s, %8s\n", + "Method", "Threads", "It./s.", "It./s.*th.", "Iters.", "Time", "Writes"); + + for (int i = 1; i <= 32; i*=2) + testGet(i, WRITE_RATE); + + for (int i = 1; i <= 32; i*=2) + testGetSafe(i, WRITE_RATE); + } + + /** + * Tests {@link ConcurrentLinkedHashMap#getSafe(Object)} method. + * + * @param threadCnt Number of threads to run. + * @param writeProportion Amount of writes from total number of iterations. + */ + public static void testGetSafe(int threadCnt, double writeProportion) { + test(new C2<Integer, ConcurrentLinkedHashMap<Integer, Integer>, Integer>() { + @Override public Integer apply(Integer key, ConcurrentLinkedHashMap<Integer, Integer> map) { + return map.getSafe(key); + } + + @Override public String toString() { + return "getSafe"; + } + }, threadCnt, writeProportion); + } + + /** + * Tests {@link ConcurrentLinkedHashMap#get(Object)} method. + * + * @param threadCnt Number of threads to run. + * @param writeProportion Amount of writes from total number of iterations. + */ + public static void testGet(int threadCnt, double writeProportion) { + test(new C2<Integer, ConcurrentLinkedHashMap<Integer, Integer>, Integer>() { + @Override public Integer apply(Integer key, ConcurrentLinkedHashMap<Integer, Integer> map) { + return map.get(key); + } + + @Override public String toString() { + return "get"; + } + }, threadCnt, writeProportion); + } + + /** + * Test a generic access method on map. + * + * @param readOp Access method to test. + * @param threadCnt Number of threads to run. + * @param writeProportion Amount of writes from total number of iterations. + */ + @SuppressWarnings({"BusyWait"}) + private static void test(C2<Integer, ConcurrentLinkedHashMap<Integer, Integer>, Integer> readOp, int threadCnt, + double writeProportion) { + assert writeProportion < 1; + + ConcurrentLinkedHashMap<Integer, Integer> map = new ConcurrentLinkedHashMap<>(); + + CyclicBarrier barrier = new CyclicBarrier(threadCnt + 1); + + Collection<TestThread> threads = new ArrayList<>(threadCnt); + + for (int i = 0; i < threadCnt; i++) { + TestThread thread = new TestThread(readOp, map, writeProportion, barrier); + + threads.add(thread); + + thread.start(); + } + + long start; + + try { + // Wait threads warm-up. + while (barrier.getNumberWaiting() != threadCnt) + Thread.sleep(1); + + // Starting test and letting it run for 1 minute. + barrier.await(); + + start = System.currentTimeMillis(); + + Thread.sleep(60000); + } + catch (InterruptedException ignored) { + return; + } + catch (BrokenBarrierException e) { + e.printStackTrace(); + + return; + } + + for (TestThread th : threads) + th.interrupt(); + + try { + for (TestThread th : threads) + th.join(); + } + catch (InterruptedException ignored) { + return; + } + + long time = System.currentTimeMillis() - start; + + long iters = 0; + + for (TestThread th : threads) + iters += th.iterations(); + + System.out.printf("%8s, %8d, %12d, %12d, %12d, %8.3f, %8.2f\n", + readOp.toString(), threadCnt, 1000*iters/time, 1000*iters/(time*threadCnt), iters, time/(double)1000, writeProportion); + } + + /** + * Test thread. Performs read/write operations on the given map + * with the given ration. + */ + private static class TestThread extends Thread { + /** */ + private final C2<Integer, ConcurrentLinkedHashMap<Integer, Integer>, Integer> readOp; + + /** */ + private final ConcurrentLinkedHashMap<Integer, Integer> map; + + /** */ + private final double writeProportion; + + /** */ + private final CyclicBarrier barrier; + + /** */ + private final Random rnd = new Random(); + + /** Number of passed run iterations. */ + private long iterations; + + /** + * @param readOp Read operation to test. + * @param map Map to test. + * @param writeProportion Amount of writes from total number of iterations. + * @param barrier Barrier to await before starting 'clear' run. + */ + TestThread(final C2<Integer, ConcurrentLinkedHashMap<Integer, Integer>, Integer> readOp, + ConcurrentLinkedHashMap<Integer, Integer> map, double writeProportion, CyclicBarrier barrier) { + this.readOp = readOp; + this.map = map; + this.writeProportion = writeProportion; + this.barrier = barrier; + } + + /** {@inheritDoc} */ + @Override public void run() { + for (int i = 0; i < 1000000; i++) + doIteration(); + + try { + barrier.await(); + } + catch (InterruptedException ignored) { + return; + } + catch (BrokenBarrierException e) { + e.printStackTrace(); + + return; + } + + while (!interrupted()) { + doIteration(); + + iterations++; + } + } + + /** + * Performs test iteration. + */ + private void doIteration() { + Integer key = rnd.nextInt(KEY_RANGE); + + if (rnd.nextDouble() <= writeProportion) + map.put(key, rnd.nextInt()); + else + readOp.apply(key, map); + } + + /** + * @return Number of passes iterations. + */ + public long iterations() { + return iterations; + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java new file mode 100644 index 0000000..e3adec6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.mapper; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; + +/** + * Continuous mapper load test. + */ +public class GridContinuousMapperLoadTest1 { + /** + * Main method. + * + * @param args Parameters. + * @throws IgniteCheckedException If failed. + */ + public static void main(String[] args) throws IgniteCheckedException { + try (Ignite g = G.start("examples/config/example-cache.xml")) { + int max = 30000; + + IgniteDataLoader<Integer, TestObject> ldr = g.dataLoader("replicated"); + + for (int i = 0; i < max; i++) + ldr.addData(i, new TestObject(i, "Test object: " + i)); + + // Wait for loader to complete. + ldr.close(false); + + X.println("Populated replicated cache."); + + g.compute().execute(new GridContinuousMapperTask1(), max); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java new file mode 100644 index 0000000..c9ba9ab --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.mapper; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Continuous mapper load test. + */ +public class GridContinuousMapperLoadTest2 { + /** + * Main method. + * + * @param args Parameters. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + final AtomicInteger jobIdGen = new AtomicInteger(); + final AtomicInteger sentJobs = new AtomicInteger(); + + final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10); + + /** Worker thread. */ + Thread t = new Thread("mapper-worker") { + @Override public void run() { + try { + while (!Thread.currentThread().isInterrupted()) + queue.put(jobIdGen.incrementAndGet()); + } + catch (InterruptedException ignore) { + // No-op. + } + } + }; + + Ignite g = G.start("examples/config/example-cache.xml"); + + try { + int max = 20000; + + IgniteDataLoader<Integer, TestObject> ldr = g.dataLoader("replicated"); + + for (int i = 0; i < max; i++) + ldr.addData(i, new TestObject(i, "Test object: " + i)); + + // Wait for loader to complete. + ldr.close(false); + + X.println("Populated replicated cache."); + + t.start(); + + while (sentJobs.get() < max) { + int[] jobIds = new int[10]; + + for (int i = 0; i < jobIds.length; i++) + jobIds[i] = queue.take(); + + sentJobs.addAndGet(10); + + g.compute().execute(new GridContinuousMapperTask2(), jobIds); + } + } + finally { + t.interrupt(); + + t.join(); + + G.stopAll(false); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask1.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask1.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask1.java new file mode 100644 index 0000000..264e8de --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask1.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.mapper; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Test task. + */ +@SuppressWarnings("TransientFieldNotInitialized") +@ComputeTaskNoResultCache +public class GridContinuousMapperTask1 extends ComputeTaskAdapter<Integer, Integer> { + /** Job ID generator. */ + private final transient AtomicInteger jobIdGen = new AtomicInteger(); + + /** Mapper. */ + @IgniteTaskContinuousMapperResource + private ComputeTaskContinuousMapper mapper; + + /** Grid. */ + @IgniteInstanceResource + private Ignite g; + + /** Blocking queue. */ + private final transient LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10); + + /** Sent jobs count. */ + private final transient AtomicInteger sentJobs = new AtomicInteger(); + + /** Maximum number of executions. */ + private transient int maxExecs; + + /** Worker thread. */ + private transient Thread t = new Thread("mapper-worker") { + @Override public void run() { + try { + while (!Thread.currentThread().isInterrupted()) + queue.put(jobIdGen.getAndIncrement()); + } + catch (InterruptedException ignore) { + // No-op. + } + } + }; + + /** + * Sends job to node. + * + * @param n Node. + * @throws IgniteCheckedException If failed. + */ + private void sendJob(ClusterNode n) throws IgniteCheckedException { + try { + int jobId = queue.take(); + + sentJobs.incrementAndGet(); + + mapper.send(new ComputeJobAdapter(jobId) { + @IgniteInstanceResource + private Ignite g; + + @Override public Object execute() { + Integer jobId = argument(0); + + X.println(">>> Received job for ID: " + jobId); + + return g.cache("replicated").peek(jobId); + } + }, n); + } + catch (InterruptedException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Integer arg) + throws IgniteCheckedException { + maxExecs = arg; + + // Start worker thread. + t.start(); + + if (g.cluster().nodes().size() == 1) + sendJob(g.cluster().localNode()); + else + for (ClusterNode n : g.cluster().forRemotes().nodes()) + sendJob(n); + + return null; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { + if (res.getException() != null) + throw new IgniteCheckedException(res.getException()); + + TestObject o = res.getData(); + + assert o != null; + + X.println("Received job result from node [resId=" + o.getId() + ", node=" + res.getNode().id() + ']'); + + if (sentJobs.get() < maxExecs) + sendJob(res.getNode()); + + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + X.println(">>> Reducing task..."); + + t.interrupt(); + + try { + t.join(); + } + catch (InterruptedException e) { + throw new IgniteCheckedException(e); + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask2.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask2.java new file mode 100644 index 0000000..693930a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask2.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.mapper; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Test task. + */ +public class GridContinuousMapperTask2 extends ComputeTaskAdapter<int[], Integer> { + /** Grid. */ + @IgniteInstanceResource + private Ignite g; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable int[] jobIds) + throws IgniteCheckedException { + Map<ComputeJob, ClusterNode> mappings = new HashMap<>(jobIds.length); + + Iterator<ClusterNode> nodeIter = g.cluster().forRemotes().nodes().iterator(); + + for (int jobId : jobIds) { + ComputeJob job = new ComputeJobAdapter(jobId) { + @IgniteInstanceResource + private Ignite g; + + @Override public Object execute() { + Integer jobId = argument(0); + + X.println(">>> Received job for ID: " + jobId); + + return g.cache("replicated").peek(jobId); + } + }; + + // If only local node in the grid. + if (g.cluster().nodes().size() == 1) + mappings.put(job, g.cluster().localNode()); + else { + ClusterNode n = nodeIter.hasNext() ? nodeIter.next() : + (nodeIter = g.cluster().forRemotes().nodes().iterator()).next(); + + mappings.put(job, n); + } + } + + return mappings; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { + TestObject o = res.getData(); + + X.println("Received job result from node [resId=" + o.getId() + ", node=" + res.getNode().id() + ']'); + + return super.result(res, rcvd); + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + X.println(">>> Reducing task..."); + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridNodeStartup.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridNodeStartup.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridNodeStartup.java new file mode 100644 index 0000000..22f906f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridNodeStartup.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.mapper; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; + +import javax.swing.*; + +/** + * Starts up an empty node with cache configuration. + * You can also start a stand-alone GridGain instance by passing the path + * to configuration file to {@code 'ggstart.{sh|bat}'} script, like so: + * {@code 'ggstart.sh examples/config/example-cache.xml'}. + * <p> + * The difference is that running this class from IDE adds all example classes to classpath + * but running from command line doesn't. + */ +public class GridNodeStartup { + /** + * Start up an empty node with specified cache configuration. + * + * @param args Command line arguments, none required. + * @throws IgniteCheckedException If example execution failed. + */ + public static void main(String[] args) throws IgniteCheckedException { + try (Ignite ignored = G.start("examples/config/example-cache.xml")) { + // Wait until Ok is pressed. + JOptionPane.showMessageDialog( + null, + new JComponent[] { + new JLabel("GridGain started."), + new JLabel("Press OK to stop GridGain.") + }, + "GridGain", + JOptionPane.INFORMATION_MESSAGE + ); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/TestObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/TestObject.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/TestObject.java new file mode 100644 index 0000000..874cfa4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/TestObject.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.mapper; + +import org.apache.ignite.cache.query.*; + +import java.io.*; + +/** + * Test object. + */ +public class TestObject implements Serializable { + /** ID. */ + @GridCacheQuerySqlField(unique = true) + private int id; + + /** Text. */ + @GridCacheQuerySqlField + private String txt; + + /** + * @param id ID. + * @param txt Text. + */ + public TestObject(int id, String txt) { + this.id = id; + this.txt = txt; + } + + /** + * @return ID. + */ + public int getId() { + return id; + } + + /** + * @return Text. + */ + public String getText() { + return txt; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java new file mode 100644 index 0000000..8e0a59c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.mergesort; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; + +/** + * A task that performs distributed Merge Sort. + */ +public class GridMergeSortLoadTask extends ComputeTaskSplitAdapter<int[], int[]> { + /** Injected Grid instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * Receives the array to sort, splits it into 2 arrays, and returns 2 + * jobs that perform that task recursively, each for the corresponding part + * of the array. Each recursive task will return a sorted array. + * + * Because this is a recursive algorithm and we cannot hold threads are every + * recursion step, we use the <i>continuation</i> mechanism + * ({@link org.apache.ignite.compute.ComputeJobContext} methods {@code holdcc()} and {@code callcc()}) + * to pause the parent tasks while the child tasks are running. Otherwise we may + * run out of threads. + * + * @param gridSize Number of available grid nodes. Note that returned number of + * jobs can be less, equal or greater than this grid size. + * @param initArr Array to sort. + * @return 2 jobs that will run the sort recursively for each part of the array. + */ + @Override protected Collection<ComputeJob> split(int gridSize, int[] initArr) { + Collection<ComputeJob> jobs = new LinkedList<>(); + + for (final int[] arr : splitArray(initArr)) { + jobs.add(new ComputeJobAdapter() { + // Auto-inject job context. + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + // Task execution result future. + private ComputeTaskFuture<int[]> fut; + + @Override public Object execute() throws IgniteCheckedException { + if (arr.length == 1) + return arr; + + // Future is null before holdcc() is called and + // not null after callcc() is called. + if (fut == null) { + IgniteCompute comp = ignite.compute().enableAsync(); + + // Launch the recursive child task asynchronously. + comp.execute(new GridMergeSortLoadTask(), arr); + + fut = comp.future(); + + // Add a listener to the future, that will resume the + // parent task once the child one is completed. + fut.listenAsync(new CI1<IgniteFuture<int[]>>() { + @Override public void apply(IgniteFuture<int[]> fut) { + // CONTINUATION: + // ============= + // Resume suspended job execution. + jobCtx.callcc(); + } + }); + + // CONTINUATION: + // ============= + // Suspend job execution to be continued later and + // release the executing thread. + return jobCtx.holdcc(); + } + else { + assert fut.isDone(); + + // Return the result of a completed child task. + return fut.get(); + } + } + }); + } + + return jobs; + } + + /** + * This method is called when both child jobs are completed, and is a + * Reduce step of Merge Sort algorithm. + * + * On this step we do a merge of 2 sorted arrays, produced by child tasks, + * into a 1 sorted array. + * + * @param results The child task execution results (sorted arrays). + * @return A merge result: single sorted array. + */ + @Override public int[] reduce(List<ComputeJobResult> results) { + if (results.size() == 1) // This is in case we have a single-element array. + return results.get(0).getData(); + + assert results.size() == 2; + + int[] arr1 = results.get(0).getData(); + int[] arr2 = results.get(1).getData(); + + return mergeArrays(arr1, arr2); + } + + /** + * Splits the array into two parts. + * + * If array size is odd, then the second part is one element + * greater than the first one. Otherwise, the parts have + * equal size. + * + * @param arr Array to split. + * @return Split result: a collection of 2 arrays. + */ + private Iterable<int[]> splitArray(int[] arr) { + int len1 = arr.length / 2; + int len2 = len1 + arr.length % 2; + + int[] a1 = new int[len1]; + int[] a2 = new int[len2]; + + System.arraycopy(arr, 0, a1, 0, len1); + System.arraycopy(arr, len1, a2, 0, len2); + + return Arrays.asList(a1, a2); + } + + /** + * Performs a merge of 2 arrays. This method runs the element-by-element + * comparison of specified arrays and stacks the least elements into a + * resulting array. + * + * @param arr1 First array. + * @param arr2 Second array. + * @return The merged array, in which any element from the first half is less or equal + * than any element from the second half. + */ + private int[] mergeArrays(int[] arr1, int[] arr2) { + int[] ret = new int[arr1.length + arr2.length]; + + int i1 = 0; + int i2 = 0; + + // Merge 2 arrays into a resulting array + for (int i = 0; i < ret.length; i++) { + if (i1 >= arr1.length) { + System.arraycopy(arr2, i2, ret, i, arr2.length - i2); // Copy the remainder of an array. + + break; + } + else if (i2 >= arr2.length) { + System.arraycopy(arr1, i1, ret, i, arr1.length - i1); // Copy the remainder of an array. + + break; + } + else + ret[i] = arr1[i1] <= arr2[i2] ? arr1[i1++] : arr2[i2++]; + } + + return ret; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTest.java new file mode 100644 index 0000000..130df68 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.mergesort; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.lang.*; +import org.gridgain.testframework.*; + +import java.io.*; +import java.util.*; + +/** + * Merge-sort load test. + */ +public class GridMergeSortLoadTest { + /** Default size of real array to be sorted. */ + private static final int ARR_SIZE = 100000; + + /** A fixed seed that gives equal arrays each run. */ + public static final int FIXED_SEED = 23; + + /** + * Entry point for this test. Generates an input array of random integers, + * starts the grid, and launches the job. The method then waits for job completion. + * + * @param args Program arguments. + * <ul> + * <li> + * <b>1-st argument:</b> absolute or relative path to the configuration + * file for the grid (optional). + * </li> + * <li> + * <b>2-nd argument:</b> size of the generated array (optional, default: {@code 100000}). + * </li> + * <li> + * <b>3-nd argument:</b> size of the generated array for "warm up" (optional, default: {@code 10000}). + * </li> + * </ul> + * @throws IgniteCheckedException In case of error. + * @throws IOException In case of file output error. + */ + public static void main(String[] args) throws IgniteCheckedException, IOException { + GridFileLock fileLock = GridLoadTestUtils.fileLock(); + + fileLock.lock(); + + try { + String outputFileName = args.length >= 1 ? args[0] : null; + + try (Ignite g = G.start(args.length >= 2 ? args[1] : "modules/core/src/test/config/load/merge-sort-base.xml")) { + int arrRealSize = args.length > 1 ? Integer.parseInt(args[1]) : ARR_SIZE; + + int arrWarmupSize = args.length > 2 ? Integer.parseInt(args[2]) : ARR_SIZE; + + X.println("Test is being executed on the gird of size " + g.cluster().nodes().size() + "."); + + X.println("Performing warm up sorting of int[" + arrWarmupSize + "]..."); + + sort(g, arrWarmupSize); + + X.println("Cleaning up after warm-up..."); + + // Run GC on all nodes. + g.compute().broadcast(new GridAbsClosure() { + @Override public void apply() { + System.gc(); + } + }); + + X.println("Performing measured sorting of int[" + arrRealSize + "]..."); + + long execTime = sort(g, arrRealSize); + + if (outputFileName != null) + GridLoadTestUtils.appendLineToFile( + outputFileName, + "%s,%d", + GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date()), + execTime / 1000); + } + } + finally { + fileLock.close(); + } + } + + /** + * Generates a random array and performs merge sort benchmark. + * + * @param g Grid to run sorting on. + * @param size Size of the generated array, which we sort. + * @return Sort execution time in milliseconds. + * @throws IgniteCheckedException If failed. + */ + private static long sort(Ignite g, int size) throws IgniteCheckedException { + int[] bigArr = generateRandomArray(size); + + X.println("Array is generated."); + + long startTime = System.currentTimeMillis(); + + g.compute().execute(new GridMergeSortLoadTask(), bigArr); + + long execTime = System.currentTimeMillis() - startTime; + + X.println("Sorting is finished. Execution time: " + execTime + "ms"); + + return execTime; + } + + /** + * Generates an int array of random elements of specified size. + * + * @param size Size of an array. + * @return The generated array. + */ + private static int[] generateRandomArray(int size) { + int[] ret = new int[size]; + + Random rnd = new Random(FIXED_SEED); + + for (int i = 0; i < ret.length; i++) + ret[i] = rnd.nextInt(100); + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java b/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java new file mode 100644 index 0000000..d6f2d07 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.nio; + +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * + */ +public class GridNioBenchmarkClient { + /** */ + private static final int INITIAL_PACKET_SIZE = 65536; + + /** */ + private static final byte[] INITIAL_PACKET = new byte[INITIAL_PACKET_SIZE]; + + /** + * + */ + static { + Random r = new Random(); + + for (int i = 0; i < INITIAL_PACKET_SIZE; i++) + r.nextBytes(INITIAL_PACKET); + } + + /** */ + private final int connCnt; + + /** */ + private final String host; + + /** */ + private final int port; + + /** */ + private final ExecutorService exec; + + /** */ + private final byte[] buf = new byte[(int)(65536*1.5)]; + + /** + * @param connCnt Connections count. + * @param host Host. + * @param port Port. + */ + public GridNioBenchmarkClient(int connCnt, String host, int port) { + this.connCnt = connCnt; + this.host = host; + this.port = port; + + exec = Executors.newFixedThreadPool(connCnt); + } + + /** + * Runs single benchamark configuration. + * + * @throws IOException If connection failed. + * @throws InterruptedException If benchmark was interrupted. + */ + public void run() throws IOException, InterruptedException { + for (int i = 0; i < connCnt; i++) + exec.submit(new ClientThread()); + + Thread.sleep(5*60*1000); + + exec.shutdownNow(); + } + + /** + * Runs set of tests. + * + * @param args Command line arguments. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + if (args.length != 3) { + X.println("Usage: " + GridNioBenchmarkClient.class.getSimpleName() + " <connections count> <host> <port>"); + + return; + } + + final int connCnt = Integer.parseInt(args[0]); + final String host = args[1]; + final int port = Integer.parseInt(args[2]); + + new GridNioBenchmarkClient(connCnt, host, port).run(); + } + + /** + * Test thread. + */ + private class ClientThread implements Runnable { + /** {@inheritDoc} */ + @Override public void run() { + Socket s = new Socket(); + + try { + s.connect(new InetSocketAddress(host, port)); + + InputStream in = s.getInputStream(); + OutputStream out = s.getOutputStream(); + + out.write(INITIAL_PACKET); + + for (int i = 0; i < 1000000; i++) + doIteration(in, out); + + long bytes = 0; + + long start = System.currentTimeMillis(); + + while (!Thread.interrupted()) + bytes += doIteration(in, out); + + long duration = System.currentTimeMillis() - start; + + long mb = bytes/1048576; + + X.println("Thread finished [MB=" + bytes/1048576 + ", MB/s=" + ((double)mb)*1000/duration + "]"); + } + catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Performs single test iterations. + * + * @param in Stream to read data. + * @param out Stream to write data. + * @return Echoed bytes count. + * @throws IOException If failed. + */ + @SuppressWarnings("CallToThreadYield") + private long doIteration(InputStream in, OutputStream out) throws IOException { + int read = in.read(buf, 0, in.available()); + + if (read == 0) + Thread.yield(); + + out.write(buf, 0, read); + + return read; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkTest.java new file mode 100644 index 0000000..c132aa0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.nio; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.logger.*; +import org.jetbrains.annotations.*; + +import java.net.*; +import java.nio.*; + +/** + * + */ +public class GridNioBenchmarkTest { + /** */ + private final int port; + + /** */ + private final int selectorCnt; + + /** + * @param selectorCnt Selector count. + * @param port Port. + */ + public GridNioBenchmarkTest(int selectorCnt, int port) { + this.selectorCnt = selectorCnt; + this.port = port; + } + + /** + * Runs the benchmark. + * + * @throws UnknownHostException If can't connect to given hist, + * @throws IgniteCheckedException If NIO server initialisation failed. + */ + @SuppressWarnings("ConstantConditions") + public void run() throws UnknownHostException, IgniteCheckedException { + GridNioServerListener<ByteBuffer> lsnr = new GridNioServerListenerAdapter<ByteBuffer>() { + @Override public void onConnected(GridNioSession ses) { + X.print("New connection accepted."); + } + + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + // No-op. + } + + @Override public void onMessage(GridNioSession ses, ByteBuffer msg) { + ByteBuffer buf = ByteBuffer.allocate(msg.remaining()).put(msg); + buf.position(0); + ses.send(buf); + } + + @Override public void onSessionWriteTimeout(GridNioSession ses) { + X.error("Session write timeout. Closing."); + } + + @Override public void onSessionIdleTimeout(GridNioSession ses) { + X.error("Session idle timeout. Closing."); + } + }; + + IgniteLogger log = new GridTestLog4jLogger(U.resolveGridGainUrl("config/gridgain-log4j.xml")); + + GridNioServer.<ByteBuffer>builder() + .address(InetAddress.getByName("localhost")) + .port(port) + .listener(lsnr) + .logger(log) + .selectorCount(selectorCnt) + .gridName("") + .tcpNoDelay(false) + .directBuffer(false) + .byteOrder(ByteOrder.nativeOrder()) + .socketSendBufferSize(0) + .socketReceiveBufferSize(0) + .sendQueueLimit(0) + .build() + .start(); + } + + /** + * Runs the benchmark. + * + * @param args Command line arguments. + * @throws UnknownHostException If can't connect to given hist, + * @throws IgniteCheckedException If NIO server initialisation failed. + */ + public static void main(String[] args) throws UnknownHostException, IgniteCheckedException { + if (args.length != 2) { + X.println("Usage: " + GridNioBenchmarkTest.class.getSimpleName() + " <threads> <port>"); + + return; + } + + final int threads = Integer.parseInt(args[0]); + final int port = Integer.parseInt(args[1]); + + new GridNioBenchmarkTest(threads, port).run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/GridOffHeapMapPerformanceAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/GridOffHeapMapPerformanceAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/GridOffHeapMapPerformanceAbstractTest.java new file mode 100644 index 0000000..8ddd099 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/GridOffHeapMapPerformanceAbstractTest.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.offheap; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.offheap.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; + +/** + * Tests off-heap map. + */ +public abstract class GridOffHeapMapPerformanceAbstractTest extends GridCommonAbstractTest { + /** Random. */ + private static final Random RAND = new Random(); + + /** */ + protected static final int LOAD_CNT = 1024 * 1024; + + /** Sample map. */ + private static Map<String, T3<String, byte[], byte[]>> kvMap = + new HashMap<>(LOAD_CNT); + + /** Unsafe map. */ + private GridOffHeapMap<String> map; + + /** */ + protected float load = 0.75f; + + /** */ + protected int initCap = 1024 * 1024 * 1024; + + /** */ + protected int concurrency = 16; + + /** */ + protected short lruStripes = 16; + + /** */ + protected GridOffHeapEvictListener evictClo; + + /** */ + protected long mem = 12L * 1024L * 1024L * 1024L; + + /** */ + protected long dur = 60 * 1000;//2 * 60 * 60 * 1000; + + /** + * + */ + protected GridOffHeapMapPerformanceAbstractTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + map = newMap(); + + if (kvMap.isEmpty()) + for (int i = 0; i < LOAD_CNT; i++) { + String k = string(); + String v = string(); + + kvMap.put(k, new T3<>(v, k.getBytes(), v.getBytes())); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + if (map != null) + map.destruct(); + } + + /** + * @return New map. + */ + protected abstract <K> GridOffHeapMap<K> newMap(); + + /** + * @param key Key. + * @return Hash. + */ + private int hash(Object key) { + return hash(key.hashCode()); + } + + /** + * @param h Hashcode. + * @return Hash. + */ + private int hash(int h) { + // Apply base step of MurmurHash; see http://code.google.com/p/smhasher/ + // Despite two multiplies, this is often faster than others + // with comparable bit-spread properties. + h ^= h >>> 16; + h *= 0x85ebca6b; + h ^= h >>> 13; + h *= 0xc2b2ae35; + + return (h >>> 16) ^ h; + } + + /** + * + * @return New Object. + */ + private String string() { + String key = ""; + + for (int i = 0; i < 3; i++) + key += RAND.nextLong(); + + return key; + } + + /** + * Test plain hash map. + */ + public void testHashMapPutRemove() { + Map<String, String> map = new HashMap<>(LOAD_CNT); + + info("Starting standard HashMap performance test..."); + + long cnt = 0; + + long start = System.currentTimeMillis(); + + boolean rmv = false; + + boolean done = false; + + while (!done) { + for (Map.Entry<String, T3<String, byte[], byte[]>> e : kvMap.entrySet()) { + String key = e.getKey(); + T3<String, byte[], byte[]> t = e.getValue(); + + try { + if (rmv) + map.remove(key); + else + map.put(key, t.get1()); + } + catch (GridOffHeapOutOfMemoryException ex) { + error("Map put failed for count: " + cnt, ex); + + throw ex; + } + + if (cnt > 0 && cnt % 10000000 == 0) { + long cur = System.currentTimeMillis(); + + long throughput = cnt * 1000 / (cur - start); + + X.println("Insert [cnt=" + cnt + ", ops/sec=" + throughput + ']'); + + if ((cur - start) > dur) { + done = true; + + break; + } + } + + cnt++; + } + + rmv = !rmv; + } + } + + /** + * + */ + public void testInsertRemoveLoad() { + info("Starting insert performance test..."); + + long cnt = 0; + + long start = System.currentTimeMillis(); + + boolean rmv = false; + + boolean done = false; + + while (!done) { + for (Map.Entry<String, T3<String, byte[], byte[]>> e : kvMap.entrySet()) { + String key = e.getKey(); + T3<String, byte[], byte[]> t = e.getValue(); + + try { + if (rmv) + map.remove(hash(key), t.get2()); + else + map.insert(hash(key), t.get2(), t.get3()); + } + catch (GridOffHeapOutOfMemoryException ex) { + error("Map put failed for count: " + cnt, ex); + + throw ex; + } + + if (cnt > 0 && cnt % 10000000 == 0) { + long cur = System.currentTimeMillis(); + + long throughput = cnt * 1000 / (cur - start); + + X.println("Insert [cnt=" + cnt + ", ops/sec=" + throughput + ']'); + + if ((cur - start) > dur) { + done = true; + + break; + } + } + + cnt++; + } + + rmv = !rmv; + } + } + + + /** + * + */ + public void testPutRemoveLoad() { + info("Starting put performance test..."); + + long cnt = 0; + + long start = System.currentTimeMillis(); + + boolean rmv = false; + + boolean done = false; + + while (!done) { + for (Map.Entry<String, T3<String, byte[], byte[]>> e : kvMap.entrySet()) { + String key = e.getKey(); + T3<String, byte[], byte[]> t = e.getValue(); + + try { + if (rmv) + map.remove(hash(key), t.get2()); + else + map.put(hash(key), t.get2(), t.get3()); + } + catch (GridOffHeapOutOfMemoryException ex) { + error("Map put failed for count: " + cnt, ex); + + throw ex; + } + + if (cnt > 0 && cnt % 10000000 == 0) { + long cur = System.currentTimeMillis(); + + long throughput = cnt * 1000 / (cur - start); + + X.println("Put [cnt=" + cnt + ", ops/sec=" + throughput + ']'); + + if ((cur - start) > dur) { + done = true; + + break; + } + } + + cnt++; + } + + rmv = cnt % 3 == 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/GridOffHeapPartitionedMapPerformanceAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/GridOffHeapPartitionedMapPerformanceAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/GridOffHeapPartitionedMapPerformanceAbstractTest.java new file mode 100644 index 0000000..6e9c145 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/GridOffHeapPartitionedMapPerformanceAbstractTest.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.offheap; + +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.offheap.*; +import org.gridgain.testframework.junits.common.*; +import org.jdk8.backport.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Performance test for partitioned offheap hash map. + */ +@SuppressWarnings({"unchecked", "NonThreadSafeLazyInitialization"}) +public abstract class GridOffHeapPartitionedMapPerformanceAbstractTest extends GridCommonAbstractTest { + /** */ + protected static final int LOAD_CNT = 256; + + /** Sample keys. */ + private static T3<Integer, Integer, byte[]> keys[]; + + /** Wrapped keys. */ + private static GridByteArrayWrapper[] wrappers; + + /** Unsafe map. */ + private GridOffHeapPartitionedMap map; + + /** */ + protected float load = 0.75f; + + /** */ + protected int concurrency = 16; + + /** */ + protected short lruStripes = 16; + + /** */ + protected long mem = 2L * 1024L * 1024L * 1024L; + + /** */ + protected long dur = 120 * 1000; + + /** + * + */ + protected GridOffHeapPartitionedMapPerformanceAbstractTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + map = newMap(); + + if (keys == null) { + keys = new T3[LOAD_CNT]; + wrappers = new GridByteArrayWrapper[LOAD_CNT]; + + GridCacheAffinityFunction aff = new GridCacheConsistentHashAffinityFunction(); + + Random rnd = new Random(); + + for (int i = 0; i < LOAD_CNT; i++) { + byte[] key = new byte[rnd.nextInt(511) + 1]; + + rnd.nextBytes(key); + + GridByteArrayWrapper wrap = new GridByteArrayWrapper(key); + + keys[i] = new T3<>(aff.partition(wrap), wrap.hashCode(), key); + wrappers[i] = wrap; + } + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + if (map != null) + map.destruct(); + } + + /** + * @return New map. + */ + protected abstract GridOffHeapPartitionedMap newMap(); + + /** + * @throws Exception If failed. + */ + public void testPuts() throws Exception { + info("Warming up..."); + + checkPuts(1, 20000); + + info("Warm up finished."); + + checkPuts(Runtime.getRuntime().availableProcessors(), dur); + } + + /** + * @throws Exception If failed. + */ + public void testPutsConcurrentMap() throws Exception { + info("Warming up..."); + + checkPutsConcurrentMap(1, 20000); + + info("Warm up finished."); + + checkPutsConcurrentMap(Runtime.getRuntime().availableProcessors(), dur); + } + + /** + * @throws Exception If failed. + */ + public void testPutRemoves() throws Exception { + info("Warming up..."); + + checkPutRemoves(2, 20000); + + info("Warm up finished."); + + checkPutRemoves(Runtime.getRuntime().availableProcessors(), dur); + } + + /** + * @throws Exception If failed. + */ + public void testPutRemovesConcurrentMap() throws Exception { + info("Warming up..."); + + checkPutRemovesConcurrentMap(2, 20000); + + info("Warm up finished."); + + checkPutRemovesConcurrentMap(Runtime.getRuntime().availableProcessors(), dur); + } + + /** + * @throws Exception If failed. + */ + private void checkPuts(int threadCnt, long duration) throws Exception { + final AtomicLong opCnt = new AtomicLong(); + final AtomicLong totalOpCnt = new AtomicLong(); + + final AtomicBoolean done = new AtomicBoolean(); + + long start = System.currentTimeMillis(); + + IgniteFuture<?> fut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Random rnd = new Random(); + + byte[] val = new byte[1024]; + + long locTotalOpCnt = 0; + + while (!done.get()) { + for (int i = 0; i < 500; i++) { + T3<Integer, Integer, byte[]> key = randomKey(rnd); + + map.put(key.get1(), key.get2(), key.get3(), val); + } + + locTotalOpCnt += 500; + opCnt.addAndGet(500); + } + + totalOpCnt.addAndGet(locTotalOpCnt); + + return null; + } + }, threadCnt); + + final int step = 2000; + + while (System.currentTimeMillis() - start < duration) { + U.sleep(step); + + long ops = opCnt.getAndSet(0); + + info("Putting " + (ops * 1000) / step + " ops/sec"); + } + + done.set(true); + + fut.get(); + + long end = System.currentTimeMillis(); + + info("Average put performance: " + (totalOpCnt.get() * 1000) / (end - start) + " ops/sec"); + } + + /** + * @throws Exception If failed. + */ + private void checkPutRemoves(int threadCnt, long duration) throws Exception { + final AtomicLong opCnt = new AtomicLong(); + final AtomicLong totalOpCnt = new AtomicLong(); + + final AtomicBoolean done = new AtomicBoolean(); + + long start = System.currentTimeMillis(); + + IgniteFuture<?> fut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Random rnd = new Random(); + + byte[] val = new byte[1024]; + + long locTotalOpCnt = 0; + + while (!done.get()) { + for (int i = 0; i < 500; i++) { + T3<Integer, Integer, byte[]> key = randomKey(rnd); + + int op = rnd.nextInt(2); + + switch (op) { + case 0: + map.put(key.get1(), key.get2(), key.get3(), val); + + break; + + case 1: + map.remove(key.get1(), key.get2(), key.get3()); + + break; + + default: + assert false; + } + } + + locTotalOpCnt += 500; + opCnt.addAndGet(500); + } + + totalOpCnt.addAndGet(locTotalOpCnt); + + return null; + } + }, threadCnt); + + final int step = 2000; + + while (System.currentTimeMillis() - start < duration) { + U.sleep(step); + + long ops = opCnt.getAndSet(0); + + info("Putting " + (ops * 1000) / step + " ops/sec"); + } + + done.set(true); + + fut.get(); + + long end = System.currentTimeMillis(); + + info("Average random operation performance: " + (totalOpCnt.get() * 1000) / (end - start) + " ops/sec"); + } + /** + * @throws Exception If failed. + */ + private void checkPutsConcurrentMap(int threadCnt, long duration) throws Exception { + final Map<GridByteArrayWrapper, byte[]> map = new ConcurrentHashMap8<>(); + + final AtomicLong opCnt = new AtomicLong(); + final AtomicLong totalOpCnt = new AtomicLong(); + + final AtomicBoolean done = new AtomicBoolean(); + + long start = System.currentTimeMillis(); + + IgniteFuture<?> fut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Random rnd = new Random(); + + long locTotalOpCnt = 0; + + while (!done.get()) { + for (int i = 0; i < 500; i++) { + GridByteArrayWrapper key = randomKeyWrapper(rnd); + + map.put(key, new byte[1024]); + } + + locTotalOpCnt += 500; + opCnt.addAndGet(500); + } + + totalOpCnt.addAndGet(locTotalOpCnt); + + return null; + } + }, threadCnt); + + final int step = 2000; + + while (System.currentTimeMillis() - start < duration) { + U.sleep(step); + + long ops = opCnt.getAndSet(0); + + info("Putting " + (ops * 1000) / step + " ops/sec"); + } + + done.set(true); + + fut.get(); + + long end = System.currentTimeMillis(); + + info("Average put performance: " + (totalOpCnt.get() * 1000) / (end - start) + " ops/sec"); + } + + /** + * @throws Exception If failed. + */ + private void checkPutRemovesConcurrentMap(int threadCnt, long duration) throws Exception { + final Map<GridByteArrayWrapper, byte[]> map = new ConcurrentHashMap8<>(); + + final AtomicLong opCnt = new AtomicLong(); + final AtomicLong totalOpCnt = new AtomicLong(); + + final AtomicBoolean done = new AtomicBoolean(); + + long start = System.currentTimeMillis(); + + IgniteFuture<?> fut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Random rnd = new Random(); + + byte[] val = new byte[1024]; + + long locTotalOpCnt = 0; + + while (!done.get()) { + for (int i = 0; i < 500; i++) { + GridByteArrayWrapper key = randomKeyWrapper(rnd); + + int op = rnd.nextInt(2); + + switch (op) { + case 0: + map.put(key, val); + + break; + + case 1: + map.remove(key); + + break; + + default: + assert false; + } + } + + locTotalOpCnt += 500; + opCnt.addAndGet(500); + } + + totalOpCnt.addAndGet(locTotalOpCnt); + + return null; + } + }, threadCnt); + + final int step = 2000; + + while (System.currentTimeMillis() - start < duration) { + U.sleep(step); + + long ops = opCnt.getAndSet(0); + + info("Putting " + (ops * 1000) / step + " ops/sec"); + } + + done.set(true); + + fut.get(); + + long end = System.currentTimeMillis(); + + info("Average random operation performance: " + (totalOpCnt.get() * 1000) / (end - start) + " ops/sec"); + } + + /** + * Gets random key from pregenerated array. + * + * @param rnd Random to use. + * @return Tuple with key. + */ + private T3<Integer, Integer, byte[]> randomKey(Random rnd) { + return keys[rnd.nextInt(keys.length)]; + } + + /** + * Gets random key from pregenerated array. + * + * @param rnd Random to use. + * @return Tuple with key. + */ + private GridByteArrayWrapper randomKeyWrapper(Random rnd) { + return wrappers[rnd.nextInt(keys.length)]; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/unsafe/GridUnsafeMapPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/unsafe/GridUnsafeMapPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/unsafe/GridUnsafeMapPerformanceTest.java new file mode 100644 index 0000000..ca8822f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/unsafe/GridUnsafeMapPerformanceTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.offheap.unsafe; + +import org.apache.ignite.internal.util.offheap.*; + +/** + * Unsafe map test. + */ +public class GridUnsafeMapPerformanceTest extends GridOffHeapMapPerformanceAbstractTest { + /** {@inheritDoc} */ + @Override protected <K> GridOffHeapMap<K> newMap() { + return GridOffHeapMapFactory.unsafeMap(concurrency, load, initCap, mem, lruStripes, evictClo); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/unsafe/GridUnsafePartitionedMapPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/unsafe/GridUnsafePartitionedMapPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/unsafe/GridUnsafePartitionedMapPerformanceTest.java new file mode 100644 index 0000000..3af4c71 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/unsafe/GridUnsafePartitionedMapPerformanceTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.offheap.unsafe; + +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.internal.util.offheap.*; + +/** + * Unsafe partitioned map performance test. + */ +public class GridUnsafePartitionedMapPerformanceTest extends GridOffHeapPartitionedMapPerformanceAbstractTest { + /** {@inheritDoc} */ + @Override protected GridOffHeapPartitionedMap newMap() { + return GridOffHeapMapFactory.unsafePartitionedMap(GridCacheConsistentHashAffinityFunction.DFLT_PARTITION_COUNT, + Runtime.getRuntime().availableProcessors(), load, LOAD_CNT, mem, lruStripes, null); + } +}