http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/EventClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/EventClosure.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/EventClosure.java new file mode 100644 index 0000000..0f868a2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/EventClosure.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.streamer; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; + +/** + * Closure for events generation. + */ +class EventClosure implements IgniteInClosure<IgniteStreamer> { + /** Random range. */ + private int rndRange = 100; + + /** {@inheritDoc} */ + @Override public void apply(IgniteStreamer streamer) { + Random rnd = new Random(); + + while (!Thread.interrupted()) { + try { + streamer.addEvent(rnd.nextInt(rndRange)); + } + catch (IgniteCheckedException e) { + X.println("Failed to add streamer event: " + e); + } + } + } + + /** + * @return Random range. + */ + public int getRandomRange() { + return rndRange; + } + + /** + * @param rndRange Random range. + */ + public void setRandomRange(int rndRange) { + this.rndRange = rndRange; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerBenchmark.java new file mode 100644 index 0000000..8e74219 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerBenchmark.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.streamer; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.springframework.beans.factory.xml.*; +import org.springframework.context.support.*; +import org.springframework.core.io.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Streamer benchmark. + */ +public class GridStreamerBenchmark { + + /** + * Entry point. Expects configuration URL to be provided. + * + * @param args Arguments. First argument is grid configuration. Second optional argument "-w" - stands for + * "worker", in this case no load will be generated on the node. + * @throws Exception In case of any error. + */ + public static void main(String[] args) throws Exception{ + if (args.length == 0) + throw new IllegalArgumentException("Configuration path is not provided."); + + String cfgPath = args.length > 0 ? args[0] : + "modules/core/src/test/config/streamer/average/spring-streamer-average-local.xml"; + + boolean worker = args.length > 1 && "-w".equalsIgnoreCase(args[1]); + + // Get load definitions. + Collection<GridStreamerLoad> loads = worker ? null : loads(cfgPath); + + // Start the grid. + Ignite ignite = G.start(cfgPath); + + // Start load threads. + Collection<Thread> loadThreads = new HashSet<>(); + + if (loads != null && !loads.isEmpty()) { + for (GridStreamerLoad load : loads) { + final IgniteStreamer streamer = ignite.streamer(load.getName()); + + if (streamer == null) + throw new Exception("Steamer is not found: " + load.getName()); + + List<IgniteInClosure<IgniteStreamer>> clos = load.getClosures(); + + if (clos != null && !clos.isEmpty()) { + for (final IgniteInClosure<IgniteStreamer> clo : clos) { + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + clo.apply(streamer); + } + catch (Exception e) { + X.println("Exception during execution of closure for streamer " + + "[streamer=" + streamer.name() + ", closure=" + clo + ", err=" + + e.getMessage() + ']'); + + e.printStackTrace(); + } + } + }); + + loadThreads.add(t); + + t.start(); + } + } + } + } + + // Once all loads are started, simply join them. + System.out.println("Press enter to stop running benchmark."); + + try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) { + in.readLine(); + } + + for (Thread t : loadThreads) + t.interrupt(); + + for (Thread t : loadThreads) + t.join(); + } + + /** + * Get loads from the Spring context. + * + * @param cfgPath Configuration path. + * @return Collection of loads, if any. + * @throws Exception If failed. + */ + private static Collection<GridStreamerLoad> loads(String cfgPath) throws Exception { + URL cfgUrl; + + try { + cfgUrl = new URL(cfgPath); + } + catch (MalformedURLException ignore) { + cfgUrl = U.resolveGridGainUrl(cfgPath); + } + + if (cfgUrl == null) + throw new Exception("Spring XML configuration path is invalid: " + cfgPath); + + GenericApplicationContext springCtx = new GenericApplicationContext(); + + new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(new UrlResource(cfgUrl)); + + springCtx.refresh(); + + Map<String, GridStreamerLoad> cfgMap = springCtx.getBeansOfType(GridStreamerLoad.class); + + return cfgMap.values(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerIndexLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerIndexLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerIndexLoadTest.java new file mode 100644 index 0000000..5e19cec --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerIndexLoadTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.streamer; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.streamer.index.*; +import org.apache.ignite.streamer.index.hash.*; +import org.apache.ignite.streamer.index.tree.*; +import org.apache.ignite.streamer.window.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.loadtests.util.GridLoadTestArgs.*; +import static org.gridgain.testframework.GridTestUtils.*; + +/** + * Load test for streamer index. + */ +public class GridStreamerIndexLoadTest { + /** + * Window index configuration. + */ + private enum IndexConfiguration { + /** + * Tree index with non-unique elements and no event tracking. + */ + TREE_INDEX_NOT_UNIQUE { + /** {@inheritDoc} */ + @Override + StreamerIndexProvider<Integer, Integer, Long> indexProvider() { + StreamerTreeIndexProvider<Integer, Integer, Long> idx = new StreamerTreeIndexProvider<>(); + + idx.setUpdater(new IndexUpdater()); + idx.setUnique(false); + idx.setPolicy(StreamerIndexPolicy.EVENT_TRACKING_OFF); + + return idx; + } + }, + + /** + * Hash index with non-unique elements and no event tracking. + */ + HASH_INDEX_NOT_UNIQUE { + /** {@inheritDoc} */ + @Override + StreamerIndexProvider<Integer, Integer, Long> indexProvider() { + StreamerHashIndexProvider<Integer, Integer, Long> idx = new StreamerHashIndexProvider<>(); + + idx.setUpdater(new IndexUpdater()); + idx.setUnique(false); + idx.setPolicy(StreamerIndexPolicy.EVENT_TRACKING_OFF); + + return idx; + } + }; + + /** + * @return Index provider for this index configuration. + */ + abstract StreamerIndexProvider<Integer, Integer, Long> indexProvider(); + } + + /** + * @param args Command line arguments. + * @throws Exception If error occurs. + */ + public static void main(String[] args) throws Exception { + for (IndexConfiguration idxCfg : EnumSet.allOf(IndexConfiguration.class)) { + X.println(">>> Running benchmark for configuration: " + idxCfg); + + runBenchmark(idxCfg); + } + } + + /** + * Runs the benchmark for the specified index configuration. + * + * @param idxCfg Index configuration. + * @throws Exception If error occurs. + */ + public static void runBenchmark(IndexConfiguration idxCfg) throws Exception { + int thrCnt = getIntProperty(THREADS_CNT, 1); + int dur = getIntProperty(TEST_DUR_SEC, 60); + int winSize = getIntProperty("GG_WIN_SIZE", 5000); + + dumpProperties(System.out); + + final StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>(); + + win.setMaximumSize(winSize); + win.setIndexes(idxCfg.indexProvider()); + + win.start(); + + final AtomicLong enqueueCntr = new AtomicLong(); + + IgniteFuture<Long> enqueueFut = runMultiThreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + Random rnd = new Random(); + + while (!Thread.currentThread().isInterrupted()) { + win.enqueue(rnd.nextInt()); + + enqueueCntr.incrementAndGet(); + } + } + }, thrCnt, "generator"); + + final AtomicLong evictCntr = new AtomicLong(); + + IgniteFuture<Long> evictFut = runMultiThreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + while (!Thread.currentThread().isInterrupted()) { + win.pollEvicted(); + + evictCntr.incrementAndGet(); + } + } + }, thrCnt, "evictor"); + + IgniteFuture<Long> collFut = runMultiThreadedAsync(new CAX() { + @Override public void applyx() { + int nSec = 0; + long prevEnqueue = enqueueCntr.get(); + long prevEvict = evictCntr.get(); + + try { + while (!Thread.currentThread().isInterrupted()) { + U.sleep(1000); + nSec++; + + long curEnqueue = enqueueCntr.get(); + long curEvict = evictCntr.get(); + + X.println("Stats [enqueuePerSec=" + (curEnqueue - prevEnqueue) + + ", evictPerSec=" + (curEvict - prevEvict) + ']'); + + prevEnqueue = curEnqueue; + prevEvict = curEvict; + } + } + catch (IgniteInterruptedException ignored) { + // No-op. + } + + X.println("Final results [enqueuePerSec=" + (enqueueCntr.get() / nSec) + + ", evictPerSec=" + (evictCntr.get() / nSec) + ']'); + } + }, 1, "collector"); + + U.sleep(dur * 1000); + + X.println("Finishing test."); + + collFut.cancel(); + enqueueFut.cancel(); + evictFut.cancel(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerLoad.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerLoad.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerLoad.java new file mode 100644 index 0000000..583ed56 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerLoad.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.streamer; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Configurable streamer load. + */ +public class GridStreamerLoad { + /** Steamer name. */ + private String name; + + /** Load closures. */ + private List<IgniteInClosure<IgniteStreamer>> clos; + + /** + * @return Steamer name. + */ + public String getName() { + return name; + } + + /** + * @param name Steamer name. + */ + public void setName(String name) { + this.name = name; + } + + /** + * @return Query closure. + */ + public List<IgniteInClosure<IgniteStreamer>> getClosures() { + return clos; + } + + /** + * @param clos Query closure. + */ + public void setClosures(List<IgniteInClosure<IgniteStreamer>> clos) { + this.clos = clos; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/IndexUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/IndexUpdater.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/IndexUpdater.java new file mode 100644 index 0000000..95d76f5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/IndexUpdater.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.streamer; + +import org.apache.ignite.streamer.index.*; +import org.jetbrains.annotations.*; + +/** + * Streamer benchmark window index updater. + */ +class IndexUpdater implements StreamerIndexUpdater<Integer, Integer, Long> { + /** {@inheritDoc} */ + @Override public Integer indexKey(Integer evt) { + return evt; + } + + /** {@inheritDoc} */ + @Nullable @Override public Long onAdded(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) { + return entry.value() + 1; + } + + /** {@inheritDoc} */ + @Nullable @Override public Long onRemoved(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) { + return entry.value() - 1 == 0 ? null : entry.value() - 1; + } + + /** {@inheritDoc} */ + @Override public Long initialValue(Integer evt, Integer key) { + return 1L; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/QueryClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/QueryClosure.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/QueryClosure.java new file mode 100644 index 0000000..49f2470 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/QueryClosure.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.streamer; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * Closure for events generation. + */ +class QueryClosure implements IgniteInClosure<IgniteStreamer> { + /** Sleep period (seconds). */ + private static final int SLEEP_PERIOD_SEC = 3; + + /** Random range. */ + private int rndRange = 100; + + /** Warmup time. */ + private long warmup = 60000; + + /** {@inheritDoc} */ + @Override public void apply(IgniteStreamer streamer) { + X.println("Pefromrming warmup: " + warmup + "ms..."); + + try { + U.sleep(warmup); + } + catch (IgniteInterruptedException ignore) { + return; + } + + long initTime = System.currentTimeMillis(); + long initExecs = streamer.metrics().stageTotalExecutionCount(); + + long prevExecs = initExecs; + + while (!Thread.interrupted()) { + try { + U.sleep(SLEEP_PERIOD_SEC * 1000); + } + catch (IgniteInterruptedException ignore) { + return; + } + + long curTime = System.currentTimeMillis(); + long curExecs = streamer.metrics().stageTotalExecutionCount(); + + long deltaExecs = curExecs - prevExecs; + long deltaThroughput = deltaExecs/SLEEP_PERIOD_SEC; + + long totalTimeSec = (curTime - initTime) / 1000; + long totalExecs = curExecs - initExecs; + long totalThroughput = totalExecs/totalTimeSec; + + X.println("Measurement: [throughput=" + deltaThroughput + " execs/sec, totalThroughput=" + + totalThroughput + " execs/sec]"); + + prevExecs = curExecs; + } + } + + /** + * @return Random range. + */ + public int getRandomRange() { + return rndRange; + } + + /** + * @param rndRange Random range. + */ + public void setRandomRange(int rndRange) { + this.rndRange = rndRange; + } + + /** + * @return Warmup time (milliseconds) + */ + public long getWarmup() { + return warmup; + } + + /** + * @param warmup Warmup time (milliseconds) + */ + public void setWarmup(long warmup) { + this.warmup = warmup; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestAverage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestAverage.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestAverage.java new file mode 100644 index 0000000..fdf2aa7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestAverage.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.streamer.average; + +/** + * Average helper class. + */ +class TestAverage { + /** */ + private int total; + + /** */ + private int cnt; + + /** + * @param avg Average. + */ + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + public void increment(TestAverage avg) { + int total; + int cnt; + + synchronized (avg) { + total = avg.total; + cnt = avg.cnt; + } + + increment(total, cnt); + } + + /** + * @param total Increment total. + * @param cnt Increment count. + */ + public synchronized void increment(int total, int cnt) { + this.total += total; + this.cnt += cnt; + } + + /** + * @param total Total. + * @param cnt Count. + */ + public synchronized void set(int total, int cnt) { + this.total = total; + this.cnt = cnt; + } + + /** + * @return Running average. + */ + public synchronized double average() { + return (double)total / cnt; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestStage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestStage.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestStage.java new file mode 100644 index 0000000..cf8ac0b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestStage.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.streamer.average; + +import org.apache.ignite.*; +import org.apache.ignite.streamer.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Stage for average benchmark. + */ +class TestStage implements StreamerStage<Integer> { + /** {@inheritDoc} */ + @Override public String name() { + return "stage"; + } + + /** {@inheritDoc} */ + @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Integer> evts) + throws IgniteCheckedException { + ConcurrentMap<String, TestAverage> loc = ctx.localSpace(); + + TestAverage avg = loc.get("avg"); + + if (avg == null) + avg = F.addIfAbsent(loc, "avg", new TestAverage()); + + for (Integer e : evts) + avg.increment(e, 1); + + StreamerWindow<Integer> win = ctx.window(); + + win.enqueueAll(evts); + + while (true) { + Integer e = win.pollEvicted(); + + if (e == null) + break; + + // Subtract evicted events from running total. + avg.increment(-e, -1); + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/swap/GridSwapEvictAllBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/swap/GridSwapEvictAllBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/swap/GridSwapEvictAllBenchmark.java new file mode 100644 index 0000000..9d2c3b9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/swap/GridSwapEvictAllBenchmark.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.swap; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.fifo.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.swapspace.file.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.loadtests.util.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Swap benchmark. + */ +@SuppressWarnings("BusyWait") +public class GridSwapEvictAllBenchmark { + /** Eviction policy size. */ + public static final int EVICT_PLC_SIZE = 3200000; + + /** Keys count. */ + public static final int KEYS_CNT = 3000000; + + /** Batch size. */ + private static final int BATCH_SIZE = 200; + + /** + * @param args Parameters. + * @throws Exception If failed. + */ + public static void main(String ... args) throws Exception { + GridFileLock fileLock = GridLoadTestUtils.fileLock(); + + fileLock.lock(); + + try { + String outputFileName = args.length > 0 ? args[0] : null; + + Ignite g = start(new CacheStoreAdapter<Long, String>() { + @Nullable @Override public String load(Long key) { + return null; + } + + @Override public void loadCache(final IgniteBiInClosure<Long, String> c, + @Nullable Object... args) { + for (int i = 0; i < KEYS_CNT; i++) + c.apply((long)i, String.valueOf(i)); + } + + @Override public void write(Cache.Entry<? extends Long, ? extends String> e) { + assert false; + } + + @Override public void delete(Object key) { + assert false; + } + }); + + try { + GridCache<Object, Object> cache = g.cache(null); + + assert cache != null; + + cache.loadCache(null, 0); + + X.println("Finished load cache."); + + // Warm-up. + runBenchmark(BATCH_SIZE, BATCH_SIZE, null); + + // Run. + runBenchmark(KEYS_CNT, BATCH_SIZE, outputFileName); + + assert g.configuration().getSwapSpaceSpi().count(null) == 0; + } + finally { + G.stopAll(false); + } + } + finally { + fileLock.close(); + } + } + + /** + * @param keysCnt Number of keys to swap and promote. + * @param batchSize Size of batch to swap/promote. + * @param outputFileName Output file name. + * @throws Exception If failed. + */ + private static void runBenchmark(final int keysCnt, int batchSize, @Nullable String outputFileName) + throws Exception { + assert keysCnt % batchSize == 0; + + final AtomicInteger evictedKeysCnt = new AtomicInteger(); + + final GridCumulativeAverage evictAvg = new GridCumulativeAverage(); + + Thread evictCollector = GridLoadTestUtils.startDaemon(new Runnable() { + @Override public void run() { + int curCnt = evictedKeysCnt.get(); + + try { + while (!Thread.currentThread().isInterrupted()) { + Thread.sleep(1000); + + int newCnt = evictedKeysCnt.get(); + + int entPerSec = newCnt - curCnt; + + X.println(">>> Evicting " + entPerSec + " entries/second"); + + evictAvg.update(entPerSec); + + curCnt = newCnt; + } + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + finally { + X.println(">>> Average eviction speed: " + evictAvg + " entries/second"); + } + } + }); + + long start = System.currentTimeMillis(); + + GridCache<Object, Object> cache = G.ignite().cache(null); + + assert cache != null; + + Collection<Long> keys = new ArrayList<>(batchSize); + + for (long i = 0; i < keysCnt; i++) { + keys.add(i); + + if (keys.size() == batchSize) { + cache.evictAll(keys); + + evictedKeysCnt.addAndGet(batchSize); + + keys.clear(); + } + } + + assert keys.isEmpty(); + + long end = System.currentTimeMillis(); + + X.println("Done evicting in " + (end - start) + "ms"); + + evictCollector.interrupt(); + + final AtomicInteger unswappedKeys = new AtomicInteger(); + + final GridCumulativeAverage unswapAvg = new GridCumulativeAverage(); + + Thread unswapCollector = GridLoadTestUtils.startDaemon(new Runnable() { + @Override public void run() { + int curCnt = unswappedKeys.get(); + + try { + while (!Thread.currentThread().isInterrupted()) { + Thread.sleep(1000); + + int newCnt = unswappedKeys.get(); + + int entPerSec = newCnt - curCnt; + + X.println(">>> Unswapping " + entPerSec + " entries/second"); + + unswapAvg.update(entPerSec); + + curCnt = newCnt; + } + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + finally { + X.println(">>> Average unswapping speed: " + unswapAvg + " entries/second"); + } + } + }); + + start = System.currentTimeMillis(); + + for (long i = 0; i < keysCnt; i++) { + keys.add(i); + + if (keys.size() == batchSize) { + cache.promoteAll(keys); + + unswappedKeys.addAndGet(batchSize); + + keys.clear(); + } + } + + assert keys.isEmpty(); + + end = System.currentTimeMillis(); + + X.println("Done promote in " + (end - start) + "ms"); + + unswapCollector.interrupt(); + + if (outputFileName != null) + GridLoadTestUtils.appendLineToFile( + outputFileName, + "%s,%d,%d", + GridLoadTestUtils.DATE_TIME_FORMAT.format(new Date()), + evictAvg.get(), + unswapAvg.get() + ); + } + + /** + * @param store Cache store. + * @return Started grid. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private static Ignite start(CacheStore<Long, String> store) throws IgniteCheckedException { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setLocalHost("127.0.0.1"); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + TcpDiscoveryIpFinder finder = new TcpDiscoveryVmIpFinder(true); + + disco.setIpFinder(finder); + + cfg.setDiscoverySpi(disco); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setSwapEnabled(true); + ccfg.setEvictSynchronized(false); + ccfg.setEvictionPolicy(new GridCacheFifoEvictionPolicy(EVICT_PLC_SIZE)); + + if (store != null) { + ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setLoadPreviousValue(true); + } + + FileSwapSpaceSpi swap = new FileSwapSpaceSpi(); + +// swap.setConcurrencyLevel(16); +// swap.setWriterThreadsCount(16); + +// swap.setLevelDbCacheSize(128 * 1024 * 1024); +// swap.setLevelDbWriteBufferSize(128 * 1024 * 1024); +// swap.setLevelDbBlockSize(1024 * 1024); +// swap.setLevelDbParanoidChecks(false); +// swap.setLevelDbVerifyChecksums(false); + + cfg.setSwapSpaceSpi(swap); + + ccfg.setCacheMode(GridCacheMode.LOCAL); + ccfg.setQueryIndexEnabled(false); + + cfg.setCacheConfiguration(ccfg); + + return G.start(cfg); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/util/GridCumulativeAverage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/util/GridCumulativeAverage.java b/modules/core/src/test/java/org/apache/ignite/loadtests/util/GridCumulativeAverage.java new file mode 100644 index 0000000..567360a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/util/GridCumulativeAverage.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.util; + +/** + * Counts the cumulative average as new data arrives. + */ +public class GridCumulativeAverage { + /** Iteration number. */ + private int i; + + /** Current value. */ + private long cur; + + /** + * Updates the current average and the counter, taking into account + * the next coming value. + * + * @param nextVal The next value to recalculate the average with. + */ + public void update(long nextVal) { + cur = (nextVal + i * cur) / (i + 1); + + i++; + } + + /** + * @return The current average value. + */ + public long get() { + return cur; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return Long.toString(cur); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Long.valueOf(cur).hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return Long.valueOf(cur).equals(obj); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/apache/ignite/loadtests/util/GridLoadTestArgs.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/util/GridLoadTestArgs.java b/modules/core/src/test/java/org/apache/ignite/loadtests/util/GridLoadTestArgs.java new file mode 100644 index 0000000..bb1da33 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/util/GridLoadTestArgs.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.util; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Contains constants and methods for working with + * command line arguments, JVM properties and environment + * variables. + */ +public class GridLoadTestArgs { + /** Cache name. */ + public static final String CACHE_NAME = "GG_CACHE_NAME"; + + /** Threads count. */ + public static final String THREADS_CNT = "GG_THREADS_COUNT"; + + /** Test duration in seconds. */ + public static final String TEST_DUR_SEC = "GG_TEST_DUR_SEC"; + + /** Value size. */ + public static final String VALUE_SIZE = "GG_VALUE_SIZE"; + + /** Properties map for dumping. */ + private static ThreadLocal<Map<String, String>> props = new ThreadLocal<Map<String, String>>() { + @Override protected Map<String, String> initialValue() { + return new HashMap<>(); + } + }; + + /** + * Gets the value of either JVM property or environment variable, + * if property is not set. + * + * @param name Property name. + * @param dflt Default value. + * @return JVM property value or environment variable value if + * JVM property is {@code null} or default value if both + * are {@code null}. + */ + public static String getStringProperty(String name, String dflt) { + String ret = getStringProperty0(name); + + if (ret == null) + ret = dflt; + + props.get().put(name, ret); + + return ret; + } + + /** + * Gets the value of either JVM property or environment variable, + * if property is not set. + * + * @param name Property name. + * @return JVM property value or environment variable value if + * JVM property is undefined. Returns {@code null} if + * both JVM property and environment variable are not set. + */ + @Nullable public static String getStringProperty(String name) { + return saveProperty(name, getStringProperty0(name)); + } + + /** + * Helper method for getting property values. + * + * @param name Property name. + * @return JVM property value or environment variable value if + * JVM property is undefined. Returns {@code null} if + * both JVM property and environment variable are not set. + */ + @Nullable private static String getStringProperty0(String name) { + String ret = System.getProperty(name); + + return ret != null ? ret : System.getenv(name); + } + + /** + * Gets the integer value of either JVM property or environment variable, + * if property is not set. + * + * @param name Property name. + * @return JVM property value or environment variable value if + * JVM property is {@code null} or {@code null} if both + * are {@code null}. + */ + @Nullable public static Integer getIntProperty(String name) { + return saveProperty(name, getIntProperty0(name)); + } + + /** + * Gets the integer value of either JVM property or environment variable, + * if property is not set. + * + * @param name Property name. + * @param dflt Default value. + * @return JVM property value or environment variable value if + * JVM property is {@code null} or default value if both + * are {@code null}. + */ + @SuppressWarnings("ConstantConditions") + public static int getIntProperty(String name, int dflt) { + Integer ret = getIntProperty0(name); + + return saveProperty(name, ret != null ? ret : dflt); + } + + /** + * Helper method for getting int properties. + * + * @param name Property name. + * @return JVM property value or environment variable value if + * JVM property is {@code null} or {@code null} if both + * are {@code null}. + */ + @Nullable private static Integer getIntProperty0(String name) { + Integer ret = Integer.getInteger(name); + + if (ret == null) { + String env = System.getenv(name); + + ret = env != null ? Integer.valueOf(env) : null; + } + + return ret; + } + + /** + * Gets the integer value of either JVM property or environment variable, + * if property is not set. + * + * @param name Property name. + * @param dflt Default value. + * @param validClo Value validation closure, which returns {@code null}, if the value + * is valid, and error message, if it's not valid. + * @return JVM property value or environment variable value if + * JVM property is {@code null} or default value if both + * are {@code null}. + * @throws IgniteCheckedException If the value didn't pass the validation. + */ + public static int getIntProperty(String name, int dflt, IgniteClosure<Integer, String> validClo) + throws IgniteCheckedException { + int ret = getIntProperty(name, dflt); + + String errMsg = validClo.apply(ret); + + if (errMsg != null) + throw new IgniteCheckedException("Illegal value for " + name + " parameter: " + errMsg); + + return ret; + } + + /** + * Gets the long value of either JVM property or environment variable, + * if property is not set. + * + * @param name Property name. + * @return JVM property value or environment variable value if + * JVM property is undefined. Returns {@code null} if + * both JVM property and environment variable are not set. + */ + @Nullable public static Long getLongProperty(String name) { + return saveProperty(name, getLongProperty0(name)); + } + + /** + * Gets the long value of either JVM property or environment variable, + * if property is not set. + * + * @param name Property name. + * @param dflt Default value. + * @return JVM property value or environment variable value if + * JVM property is {@code null} or default value if both + * are {@code null}. + */ + @SuppressWarnings("ConstantConditions") + public static long getLongProperty(String name, long dflt) { + Long ret = getLongProperty(name); + + return saveProperty(name, ret != null ? ret : dflt); + } + + /** + * Helper method for getting long property. + * + * @param name Property name. + * @return JVM property value or environment variable value if + * JVM property is undefined. Returns {@code null} if + * both JVM property and environment variable are not set. + */ + @Nullable private static Long getLongProperty0(String name) { + Long ret = Long.getLong(name); + + if (ret == null) { + String env = System.getenv(name); + + ret = env != null ? Long.valueOf(env) : null; + } + + return ret; + } + + /** + * Gets the boolean value of either JVM property or environment variable, + * if property is not set. + * + * @param name Property name. + * @param dflt Default value. + * @return JVM property value or environment variable value if + * JVM property is {@code null} or default value if both + * are {@code null}. + */ + @SuppressWarnings("ConstantConditions") + public static boolean getBooleanProperty(String name, boolean dflt) { + Boolean ret = Boolean.getBoolean(name); + + if (ret == null) { + String env = System.getenv(name); + + ret = env != null ? Boolean.valueOf(env) : null; + } + + return saveProperty(name, ret != null ? ret : dflt); + } + + /** + * Prints a message about undefined JVM property to standard + * error. + * + * @param propName JVM property name. + */ + public static void printErrorUndefined(String propName) { + System.err.println("JVM property " + propName + " should be defined " + + "(use -D" + propName + "=...)"); + } + + /** + * Dumps the properties (name + value), that were retrieved using + * {@code get[type]Property()}. + * + * @param out Output stream to dump properties to. + */ + public static void dumpProperties(PrintStream out) { + for (Map.Entry<String, String> prop : props.get().entrySet()) + out.println(prop.getKey() + ": " + prop.getValue()); + } + + /** + * Helper method for saving a property to thread local for later use in + * {@link #dumpProperties(PrintStream)}. + * + * @param name Property name. + * @param val Property value. + * @return Property value. + */ + @Nullable private static <T> T saveProperty(String name, @Nullable T val) { + props.get().put(name, val != null ? val.toString() : null); + + return val; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/benchmarks/storevalbytes/GridCacheStoreValueBytesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/benchmarks/storevalbytes/GridCacheStoreValueBytesTest.java b/modules/core/src/test/java/org/gridgain/benchmarks/storevalbytes/GridCacheStoreValueBytesTest.java index 1785f83..04ff2dc 100644 --- a/modules/core/src/test/java/org/gridgain/benchmarks/storevalbytes/GridCacheStoreValueBytesTest.java +++ b/modules/core/src/test/java/org/gridgain/benchmarks/storevalbytes/GridCacheStoreValueBytesTest.java @@ -22,7 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.gridgain.loadtests.util.*; +import org.apache.ignite.loadtests.util.*; import java.util.*; import java.util.concurrent.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/grid/loadtest/swapspace/GridFileSwapSpaceSpiMultithreadedLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/loadtest/swapspace/GridFileSwapSpaceSpiMultithreadedLoadTest.java b/modules/core/src/test/java/org/gridgain/grid/loadtest/swapspace/GridFileSwapSpaceSpiMultithreadedLoadTest.java index d2fd8f9..2746124 100644 --- a/modules/core/src/test/java/org/gridgain/grid/loadtest/swapspace/GridFileSwapSpaceSpiMultithreadedLoadTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/loadtest/swapspace/GridFileSwapSpaceSpiMultithreadedLoadTest.java @@ -23,7 +23,7 @@ import org.apache.ignite.spi.*; import org.apache.ignite.spi.swapspace.*; import org.apache.ignite.spi.swapspace.file.*; import org.apache.ignite.internal.util.typedef.*; -import org.gridgain.loadtests.util.*; +import org.apache.ignite.loadtests.util.*; import org.gridgain.testframework.*; import org.gridgain.testframework.junits.common.*; import org.jdk8.backport.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/GridCacheLoadPopulationTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/GridCacheLoadPopulationTask.java b/modules/core/src/test/java/org/gridgain/loadtests/GridCacheLoadPopulationTask.java deleted file mode 100644 index f8c6d96..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/GridCacheLoadPopulationTask.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * - */ -public class GridCacheLoadPopulationTask extends ComputeTaskSplitAdapter<Void, Void> { - /** Serial version UID. */ - private static final long serialVersionUID = 1L; - - /** {@inheritDoc} */ - @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - return null; - } - - /** {@inheritDoc} */ - @Override protected Collection<? extends ComputeJob> split(int gridSize, Void arg) throws IgniteCheckedException { - Collection<ChunkPopulationJob> jobs = new ArrayList<>(); - - int maxElements = 10000; - int currStartElement = 0; - - while (currStartElement < GridCacheMultiNodeLoadTest.ELEMENTS_COUNT) { - jobs.add(new ChunkPopulationJob(currStartElement, maxElements)); - - currStartElement += maxElements; - } - - return jobs; - } - - /** - * Chunk population job. - */ - private static class ChunkPopulationJob implements ComputeJob { - /** Serial version UID. */ - private static final long serialVersionUID = 1L; - - /** Start element index. */ - private int startElementIdx; - - /** Mex elements. */ - private int maxElements; - - /** Injected grid. */ - @IgniteInstanceResource - private Ignite g; - - /** - * Creates chunk population job. - * - * @param startElementIdx Start element index. - * @param maxElements Max elements. - */ - ChunkPopulationJob(int startElementIdx, int maxElements) { - this.startElementIdx = startElementIdx; - this.maxElements = maxElements; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked", "ConstantConditions"}) - @Override public Object execute() throws IgniteCheckedException { - Map<Object, TestValue> map = new TreeMap<>(); - - for (int i = startElementIdx; i < startElementIdx + maxElements; i++) { - if (i >= GridCacheMultiNodeLoadTest.ELEMENTS_COUNT) - break; - - Object key = UUID.randomUUID(); - - map.put(key, new TestValue(key, i)); - } - - g.log().info("Putting values to partitioned cache [nodeId=" + g.cluster().localNode().id() + ", mapSize=" + - map.size() + ']'); - - g.cache(GridCacheMultiNodeLoadTest.CACHE_NAME).putAll(map); - - return null; - } - - /** {@inheritDoc} */ - @Override public void cancel() { - // No-op. - } - } -} - -/** - * Test value. - */ -@SuppressWarnings("ClassNameDiffersFromFileName") -class TestValue { - /** Value key. */ - private Object key; - - /** Value data. */ - private String someData; - - /** - * Constructs test value. - * - * @param key Key. - * @param id Data. - */ - TestValue(Object key, Object id) { - this.key = key; - someData = key + "_" + id + "_" + System.currentTimeMillis(); - } - - /** {@inheritDoc} */ - public String toString() { - return S.toString(TestValue.class, this); - } - - /** - * @return Key. - */ - public Object key() { - return key; - } - - /** - * @return Value data. - */ - public String someData() { - return someData; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/GridCacheMultiNodeLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/GridCacheMultiNodeLoadTest.java b/modules/core/src/test/java/org/gridgain/loadtests/GridCacheMultiNodeLoadTest.java deleted file mode 100644 index 020f7de..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/GridCacheMultiNodeLoadTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.eviction.lru.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.gridgain.testframework.junits.common.*; - -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.cache.GridCachePreloadMode.*; -import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; - -/** - * Multi-node cache test. - */ -public class GridCacheMultiNodeLoadTest extends GridCommonAbstractTest { - /** Cache name. */ - public static final String CACHE_NAME = "partitioned"; - - /** Elements count. */ - public static final int ELEMENTS_COUNT = 200000; - - /** Grid 1. */ - private static Ignite ignite1; - - /** */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(spi); - - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setName(CACHE_NAME); - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setDistributionMode(PARTITIONED_ONLY); - cacheCfg.setSwapEnabled(false); - cacheCfg.setStartSize(10); - cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - - cacheCfg.setEvictionPolicy(new GridCacheLruEvictionPolicy(100000)); - cacheCfg.setBackups(1); - - cacheCfg.setPreloadMode(SYNC); - - cfg.setCacheConfiguration(cacheCfg); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - ignite1 = startGrid(1); - startGrid(2); - - ignite1.cache(CACHE_NAME); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - - ignite1 = null; - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return Long.MAX_VALUE; - } - - /** - * @throws Exception If test failed. - */ - public void testMany() throws Exception { - ignite1.compute().execute(GridCacheLoadPopulationTask.class, null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/cache/GridCacheAbstractLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/cache/GridCacheAbstractLoadTest.java b/modules/core/src/test/java/org/gridgain/loadtests/cache/GridCacheAbstractLoadTest.java deleted file mode 100644 index d6af38f..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/cache/GridCacheAbstractLoadTest.java +++ /dev/null @@ -1,376 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.apache.log4j.*; -import org.apache.log4j.varia.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.gridgain.testframework.*; -import org.gridgain.testframework.junits.logger.*; -import org.jetbrains.annotations.*; -import org.springframework.beans.*; -import org.springframework.context.*; -import org.springframework.context.support.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Common stuff for cache load tests. - */ -abstract class GridCacheAbstractLoadTest { - /** Random. */ - protected static final Random RAND = new Random(); - - /** Default configuration file path. */ - protected static final String CONFIG_FILE = "modules/tests/config/spring-cache-load.xml"; - - /** Default log file path. */ - protected static final String LOG_FILE = "cache-load.log"; - - /** Whether to use transactions. */ - protected final boolean tx; - - /** Operations per transaction. */ - protected final int operationsPerTx; - - /** Transaction isolation level. */ - protected final IgniteTxIsolation isolation; - - /** Transaction concurrency control. */ - protected final IgniteTxConcurrency concurrency; - - /** Threads count. */ - protected final int threads; - - /** Write ratio. */ - protected final double writeRatio; - - /** Test duration. */ - protected final long testDuration; - - /** Value size. */ - protected final int valSize; - - /** */ - protected static final int WRITE_LOG_MOD = 100; - - /** */ - protected static final int READ_LOG_MOD = 1000; - - /** Reads. */ - protected final AtomicLong reads = new AtomicLong(); - - /** Reads. */ - protected final AtomicLong readTime = new AtomicLong(); - - /** Writes. */ - protected final AtomicLong writes = new AtomicLong(); - - /** Writes. */ - protected final AtomicLong writeTime = new AtomicLong(); - - /** Done flag. */ - protected final AtomicBoolean done = new AtomicBoolean(); - - /** */ - protected GridCacheAbstractLoadTest() { - Properties props = new Properties(); - - try { - props.load(new FileReader(GridTestUtils.resolveGridGainPath( - "modules/tests/config/cache-load.properties"))); - } - catch (IOException e) { - throw new RuntimeException(e); - } - - tx = Boolean.valueOf(props.getProperty("transactions")); - operationsPerTx = Integer.valueOf(props.getProperty("operations.per.tx")); - isolation = IgniteTxIsolation.valueOf(props.getProperty("isolation")); - concurrency = IgniteTxConcurrency.valueOf(props.getProperty("concurrency")); - threads = Integer.valueOf(props.getProperty("threads")); - writeRatio = Double.valueOf(props.getProperty("write.ratio")); - testDuration = Long.valueOf(props.getProperty("duration")); - valSize = Integer.valueOf(props.getProperty("value.size")); - } - - /** - * @param writeClos Write closure. - * @param readClos ReadClosure. - */ - protected void loadTest(final CIX1<GridCacheProjection<Integer, Integer>> writeClos, - final CIX1<GridCacheProjection<Integer, Integer>> readClos) { - info("Read threads: " + readThreads()); - info("Write threads: " + writeThreads()); - info("Test duration (ms): " + testDuration); - - Ignite ignite = G.ignite(); - - final GridCache<Integer, Integer> cache = ignite.cache(null); - - assert cache != null; - - try { - IgniteFuture<?> f1 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Nullable @Override public Object call() throws Exception { - long start = System.currentTimeMillis(); - - while (!done.get()) { - if (tx) { - try (IgniteTx tx = cache.txStart()) { - writeClos.apply(cache); - - tx.commit(); - } - } - else - writeClos.apply(cache); - } - - writeTime.addAndGet(System.currentTimeMillis() - start); - - return null; - } - }, writeThreads(), "cache-load-test-worker"); - - IgniteFuture<?> f2 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Nullable @Override public Object call() throws Exception { - long start = System.currentTimeMillis(); - - while(!done.get()) { - if (tx) { - try (IgniteTx tx = cache.txStart()) { - readClos.apply(cache); - - tx.commit(); - } - } - else - readClos.apply(cache); - } - - readTime.addAndGet(System.currentTimeMillis() - start); - - return null; - } - }, readThreads(), "cache-load-test-worker"); - - Thread.sleep(testDuration); - - done.set(true); - - f1.get(); - f2.get(); - - info("Test stats: "); - info(" total-threads = " + threads); - info(" write-ratio = " + writeRatio); - info(" total-runs = " + (reads.get() + writes.get())); - info(" total-reads = " + reads); - info(" total-writes = " + writes); - info(" read-time (ms) = " + readTime); - info(" write-time (ms) = " + writeTime); - info(" avg-read-time (ms) = " + ((double)readTime.get() / reads.get())); - info(" avg-write-time (ms) = " + ((double)writeTime.get() / writes.get())); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - /** - * @return Write threads count. - */ - @SuppressWarnings({"ConstantConditions"}) - protected int writeThreads() { - int ratio = (int)(threads * writeRatio); - - return writeRatio == 0 ? 0 : ratio == 0 ? 1 : ratio; - } - - /** - * @return Read threads count. - */ - @SuppressWarnings({"ConstantConditions"}) - protected int readThreads() { - int ratio = (int)(threads * (1 - writeRatio)); - - return Double.compare(writeRatio, 1) == 0 ? 0 : ratio == 0 ? 1 : ratio; - } - - /** - * @param msg Message to print. - */ - protected static void info(String msg) { - System.out.println(msg); - } - - /** - * @param msg Message to print. - */ - protected static void error(String msg) { - System.err.println(msg); - } - - /** - * Initializes logger. - * - * @param log Log file name. - * @return Logger. - * @throws IgniteCheckedException If file initialization failed. - */ - protected IgniteLogger initLogger(String log) throws IgniteCheckedException { - Logger impl = Logger.getRootLogger(); - - impl.removeAllAppenders(); - - String fileName = U.getGridGainHome() + "/work/log/" + log; - - // Configure output that should go to System.out - RollingFileAppender fileApp; - - String fmt = "[%d{ABSOLUTE}][%-5p][%t][%c{1}] %m%n"; - - try { - fileApp = new RollingFileAppender(new PatternLayout(fmt), fileName); - - fileApp.setMaxBackupIndex(0); - fileApp.setAppend(false); - - // fileApp.rollOver(); - - fileApp.activateOptions(); - } - catch (IOException e) { - throw new IgniteCheckedException("Unable to initialize file appender.", e); - } - - LevelRangeFilter lvlFilter = new LevelRangeFilter(); - - lvlFilter.setLevelMin(Level.DEBUG); - - fileApp.addFilter(lvlFilter); - - impl.addAppender(fileApp); - - // Configure output that should go to System.out - ConsoleAppender conApp = new ConsoleAppender(new PatternLayout(fmt), ConsoleAppender.SYSTEM_OUT); - - lvlFilter = new LevelRangeFilter(); - - lvlFilter.setLevelMin(Level.DEBUG); - lvlFilter.setLevelMax(Level.INFO); - - conApp.addFilter(lvlFilter); - - conApp.activateOptions(); - - impl.addAppender(conApp); - - // Configure output that should go to System.err - conApp = new ConsoleAppender(new PatternLayout(fmt), ConsoleAppender.SYSTEM_ERR); - - conApp.setThreshold(Level.WARN); - - conApp.activateOptions(); - - impl.addAppender(conApp); - - impl.setLevel(Level.INFO); - - //Logger.getLogger("org.gridgain").setLevel(Level.INFO); - //Logger.getLogger(GridCacheVersionManager.class).setLevel(Level.DEBUG); - - return new GridTestLog4jLogger(false); - } - - /** - * Initializes configurations. - * - * @param springCfgPath Configuration file path. - * @param log Log file name. - * @return Configuration. - * @throws IgniteCheckedException If fails. - */ - @SuppressWarnings("unchecked") - protected IgniteConfiguration configuration(String springCfgPath, String log) throws IgniteCheckedException { - File path = GridTestUtils.resolveGridGainPath(springCfgPath); - - if (path == null) - throw new IgniteCheckedException("Spring XML configuration file path is invalid: " + new File(springCfgPath) + - ". Note that this path should be either absolute path or a relative path to GRIDGAIN_HOME."); - - if (!path.isFile()) - throw new IgniteCheckedException("Provided file path is not a file: " + path); - - // Add no-op logger to remove no-appender warning. - Appender app = new NullAppender(); - - Logger.getRootLogger().addAppender(app); - - ApplicationContext springCtx; - - try { - springCtx = new FileSystemXmlApplicationContext(path.toURI().toURL().toString()); - } - catch (BeansException | MalformedURLException e) { - throw new IgniteCheckedException("Failed to instantiate Spring XML application context: " + e.getMessage(), e); - } - - Map cfgMap; - - try { - // Note: Spring is not generics-friendly. - cfgMap = springCtx.getBeansOfType(IgniteConfiguration.class); - } - catch (BeansException e) { - throw new IgniteCheckedException("Failed to instantiate bean [type=" + IgniteConfiguration.class + ", err=" + - e.getMessage() + ']', e); - } - - if (cfgMap == null) - throw new IgniteCheckedException("Failed to find a single grid factory configuration in: " + path); - - // Remove previously added no-op logger. - Logger.getRootLogger().removeAppender(app); - - if (cfgMap.isEmpty()) - throw new IgniteCheckedException("Can't find grid factory configuration in: " + path); - else if (cfgMap.size() > 1) - throw new IgniteCheckedException("More than one configuration provided for cache load test: " + cfgMap.values()); - - IgniteConfiguration cfg = (IgniteConfiguration)cfgMap.values().iterator().next(); - - cfg.setGridLogger(initLogger(log)); - - cfg.getTransactionsConfiguration().setDefaultTxIsolation(isolation); - cfg.getTransactionsConfiguration().setDefaultTxConcurrency(concurrency); - - return cfg; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1d8f69b/modules/core/src/test/java/org/gridgain/loadtests/cache/GridCacheAffinityTransactionsOffHeapTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/loadtests/cache/GridCacheAffinityTransactionsOffHeapTest.java b/modules/core/src/test/java/org/gridgain/loadtests/cache/GridCacheAffinityTransactionsOffHeapTest.java deleted file mode 100644 index 01b7bd8..0000000 --- a/modules/core/src/test/java/org/gridgain/loadtests/cache/GridCacheAffinityTransactionsOffHeapTest.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.loadtests.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.transactions.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; -import static org.apache.ignite.cache.GridCacheMemoryMode.*; -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; -import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; - -/** - */ -public class GridCacheAffinityTransactionsOffHeapTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final int NODE_CNT = 4; - - /** */ - private static final int THREAD_CNT = 1; - - /** */ - private static final int KEY_CNT = 10; - - /** - * @param args Command line arguments. - * @throws Exception In case of error. - */ - public static void main(String[] args) throws Exception { - startNodes(); - - for (int i = 0; i < KEY_CNT; i++) { - GridCache<Object, Integer> c = cache(i); - - c.putx((long)i, 0); - c.putx(new UserKey(i, 0), 0); - c.putx(new UserKey(i, 1), 0); - c.putx(new UserKey(i, 2), 0); - } - - assert cache(5).get(5L) != null; - - long key = 5; - - GridCache<Object, Integer> c = cache(key); - - try (IgniteTx tx = c.txStartAffinity(key, PESSIMISTIC, REPEATABLE_READ, 0, 0)) { - Integer val = c.get(key); - Integer userVal1 = c.get(new UserKey(key, 0)); - Integer userVal2 = c.get(new UserKey(key, 1)); - Integer userVal3 = c.get(new UserKey(key, 2)); - - assert val != null; - assert userVal1 != null; - assert userVal2 != null; - assert userVal3 != null; - - assert userVal1.equals(val); - assert userVal2.equals(val); - assert userVal3.equals(val); - - int newVal = val + 1; - - c.putx(key, newVal); - c.putx(new UserKey(key, 0), newVal); - c.putx(new UserKey(key, 1), newVal); - c.putx(new UserKey(key, 2), newVal); - - tx.commit(); - } - -// final AtomicLong txCnt = new AtomicLong(); -// -// GridTestUtils.runMultiThreaded( -// new Callable<Object>() { -// @Override public Object call() throws Exception { -// Random rnd = new Random(); -// -// while (!Thread.currentThread().isInterrupted()) { -// long key = rnd.nextInt(KEY_CNT); -// -// GridCache<Object, Integer> c = cache(key); -// -// try (GridCacheTx tx = c.txStartAffinity(key, PESSIMISTIC, REPEATABLE_READ, 0, 0)) { -// Integer val = c.get(key); -// Integer userVal1 = c.get(new UserKey(key, 0)); -// Integer userVal2 = c.get(new UserKey(key, 1)); -// Integer userVal3 = c.get(new UserKey(key, 2)); -// -// assert val != null; -// assert userVal1 != null; -// assert userVal2 != null; -// assert userVal3 != null; -// -// assert userVal1.equals(val); -// assert userVal2.equals(val); -// assert userVal3.equals(val); -// -// int newVal = val + 1; -// -// c.putx(key, newVal); -// c.putx(new UserKey(key, 0), newVal); -// c.putx(new UserKey(key, 1), newVal); -// c.putx(new UserKey(key, 2), newVal); -// -// tx.commit(); -// } -// -// long txDone = txCnt.incrementAndGet(); -// -// if (txDone % 1000 == 0) -// System.out.println("Transactions done: " + txDone); -// } -// -// return null; -// } -// }, -// THREAD_CNT, -// "test-thread" -// ); - } - - /** - * @param key Key. - * @return Cache. - */ - private static GridCache<Object, Integer> cache(long key) { - UUID id = Ignition.ignite("grid-0").cache(null).affinity().mapKeyToNode(key).id(); - - return Ignition.ignite(id).cache(null); - } - - /** - * @throws IgniteCheckedException In case of error. - */ - private static void startNodes() throws IgniteCheckedException { - for (int i = 0; i < NODE_CNT; i++) - Ignition.start(getConfiguration("grid-" + i)); - } - - /** - * @param name Grid name. - * @return Configuration. - */ - private static IgniteConfiguration getConfiguration(String name) { - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setGridName(name); - - CacheConfiguration cacheCfg = new CacheConfiguration(); - - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setAtomicityMode(TRANSACTIONAL); - cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - cacheCfg.setMemoryMode(OFFHEAP_TIERED); - cacheCfg.setOffHeapMaxMemory(0); - cacheCfg.setBackups(1); - - cfg.setCacheConfiguration(cacheCfg); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(disco); - - return cfg; - } - - /** - */ - private static class UserKey implements Externalizable { - /** */ - @GridCacheAffinityKeyMapped - private long affKey; - - /** */ - private int idx; - - /** - */ - public UserKey() { - // No-op. - } - - /** - * @param affKey Affinity key. - * @param idx Index. - */ - private UserKey(long affKey, int idx) { - this.affKey = affKey; - this.idx = idx; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(affKey); - out.writeInt(idx); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - affKey = in.readLong(); - idx = in.readInt(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - UserKey key = (UserKey)o; - - return affKey == key.affKey && idx == key.idx; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int result = (int)(affKey ^ (affKey >>> 32)); - - result = 31 * result + idx; - - return result; - } - } -}