http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessorSelfTest.java new file mode 100644 index 0000000..4aff1af --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessorSelfTest.java @@ -0,0 +1,970 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class DataStreamerProcessorSelfTest extends GridCommonAbstractTest { + /** */ + private static ConcurrentHashMap<Object, Object> storeMap; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private CacheMode mode = PARTITIONED; + + /** */ + private boolean nearEnabled = true; + + /** */ + private boolean useCache; + + /** */ + private TestStore store; + + /** {@inheritDoc} */ + @Override public void afterTest() throws Exception { + super.afterTest(); + + useCache = false; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "unchecked"}) + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + cfg.setIncludeProperties(); + + cfg.setMarshaller(new OptimizedMarshaller(false)); + + if (useCache) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(mode); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); + cc.setWriteSynchronizationMode(FULL_SYNC); + + cc.setEvictSynchronized(false); + cc.setEvictNearSynchronized(false); + + if (store != null) { + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + } + + cfg.setCacheConfiguration(cc); + } + else + cfg.setCacheConfiguration(); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPartitioned() throws Exception { + mode = PARTITIONED; + + checkDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + public void testColocated() throws Exception { + mode = PARTITIONED; + nearEnabled = false; + + checkDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicated() throws Exception { + mode = REPLICATED; + + checkDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + public void testLocal() throws Exception { + mode = LOCAL; + + try { + checkDataStreamer(); + + assert false; + } + catch (IgniteCheckedException e) { + // Cannot load local cache configured remotely. + info("Caught expected exception: " + e); + } + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("ErrorNotRethrown") + private void checkDataStreamer() throws Exception { + try { + Ignite g1 = startGrid(1); + + useCache = true; + + Ignite g2 = startGrid(2); + startGrid(3); + + final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null); + + ldr.updater(DataStreamerCacheUpdaters.<Integer, Integer>batchedSorted()); + + final AtomicInteger idxGen = new AtomicInteger(); + final int cnt = 400; + final int threads = 10; + + final CountDownLatch l1 = new CountDownLatch(threads); + + IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) { + int idx = idxGen.getAndIncrement(); + + futs.add(ldr.addData(idx, idx)); + } + + l1.countDown(); + + for (IgniteFuture<?> fut : futs) + fut.get(); + + return null; + } + }, threads); + + l1.await(); + + // This will wait until data streamer finishes loading. + stopGrid(getTestGridName(1), false); + + f1.get(); + + int s2 = internalCache(2).primaryKeySet().size(); + int s3 = internalCache(3).primaryKeySet().size(); + int total = threads * cnt; + + assertEquals(total, s2 + s3); + + final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null); + + rmvLdr.updater(DataStreamerCacheUpdaters.<Integer, Integer>batchedSorted()); + + final CountDownLatch l2 = new CountDownLatch(threads); + + IgniteInternalFuture<?> f2 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) { + final int key = idxGen.decrementAndGet(); + + futs.add(rmvLdr.removeData(key)); + } + + l2.countDown(); + + for (IgniteFuture<?> fut : futs) + fut.get(); + + return null; + } + }, threads); + + l2.await(); + + rmvLdr.close(false); + + f2.get(); + + s2 = internalCache(2).primaryKeySet().size(); + s3 = internalCache(3).primaryKeySet().size(); + + assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']'; + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedIsolated() throws Exception { + mode = PARTITIONED; + + checkIsolatedDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedIsolated() throws Exception { + mode = REPLICATED; + + checkIsolatedDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + private void checkIsolatedDataStreamer() throws Exception { + try { + useCache = true; + + Ignite g1 = startGrid(0); + startGrid(1); + startGrid(2); + + awaitPartitionMapExchange(); + + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); + + for (int i = 0; i < 100; i++) + cache.put(i, -1); + + final int cnt = 40_000; + final int threads = 10; + + try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null)) { + final AtomicInteger idxGen = new AtomicInteger(); + + IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < cnt; i++) { + int idx = idxGen.getAndIncrement(); + + ldr.addData(idx, idx); + } + + return null; + } + }, threads); + + f1.get(); + } + + for (int g = 0; g < 3; g++) { + ClusterNode locNode = grid(g).localNode(); + + GridCacheAdapter<Integer, Integer> cache0 = ((IgniteKernal)grid(g)).internalCache(null); + + if (cache0.isNear()) + cache0 = ((GridNearCacheAdapter<Integer, Integer>)cache0).dht(); + + CacheAffinity<Integer> aff = cache0.affinity(); + + for (int key = 0; key < cnt * threads; key++) { + if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) { + GridCacheEntryEx entry = cache0.peekEx(key); + + assertNotNull("Missing entry for key: " + key, entry); + assertEquals((key < 100 ? -1 : key), + CU.value(entry.rawGetOrUnmarshal(false), cache0.context(), false)); + } + } + } + } + finally { + stopAllGrids(); + } + } + + /** + * Test primitive arrays can be passed into data streamer. + * + * @throws Exception If failed. + */ + public void testPrimitiveArrays() throws Exception { + try { + useCache = true; + mode = PARTITIONED; + + Ignite g1 = startGrid(1); + startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used). + + List<Object> arrays = Arrays.<Object>asList( + new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4}, + new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8}); + + IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(null); + + for (int i = 0, size = arrays.size(); i < 1000; i++) { + Object arr = arrays.get(i % size); + + dataLdr.addData(i, arr); + dataLdr.addData(i, fixedClosure(arr)); + } + + dataLdr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedMultiThreaded() throws Exception { + mode = REPLICATED; + + checkLoaderMultithreaded(1, 2); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedMultiThreaded() throws Exception { + mode = PARTITIONED; + + checkLoaderMultithreaded(1, 3); + } + + /** + * Tests loader in multithreaded environment with various count of grids started. + * + * @param nodesCntNoCache How many nodes should be started without cache. + * @param nodesCntCache How many nodes should be started with cache. + * @throws Exception If failed. + */ + protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache) + throws Exception { + try { + // Start all required nodes. + int idx = 1; + + for (int i = 0; i < nodesCntNoCache; i++) + startGrid(idx++); + + useCache = true; + + for (int i = 0; i < nodesCntCache; i++) + startGrid(idx++); + + Ignite g1 = grid(1); + + // Get and configure loader. + final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null); + + ldr.updater(DataStreamerCacheUpdaters.<Integer, Integer>individual()); + ldr.perNodeBufferSize(2); + + // Define count of puts. + final AtomicInteger idxGen = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + try { + final int totalPutCnt = 50000; + + IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Collection<IgniteFuture<?>> futs = new ArrayList<>(); + + while (!done.get()) { + int idx = idxGen.getAndIncrement(); + + if (idx >= totalPutCnt) { + info(">>> Stopping producer thread since maximum count of puts reached."); + + break; + } + + futs.add(ldr.addData(idx, idx)); + } + + ldr.flush(); + + for (IgniteFuture<?> fut : futs) + fut.get(); + + return null; + } + }, 5, "producer"); + + IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!done.get()) { + ldr.flush(); + + U.sleep(100); + } + + return null; + } + }, 1, "flusher"); + + // Define index of node being restarted. + final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1; + + IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + for (int i = 0; i < 5; i++) { + Ignite g = startGrid(restartNodeIdx); + + UUID id = g.cluster().localNode().id(); + + info(">>>>>>> Started node: " + id); + + U.sleep(1000); + + stopGrid(getTestGridName(restartNodeIdx), true); + + info(">>>>>>> Stopped node: " + id); + } + } + finally { + done.set(true); + + info("Start stop thread finished."); + } + + return null; + } + }, 1, "start-stop-thread"); + + fut1.get(); + fut2.get(); + fut3.get(); + } + finally { + ldr.close(false); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testLoaderApi() throws Exception { + useCache = true; + + try { + Ignite g1 = startGrid(1); + + IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null); + + ldr.close(false); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + ldr.future().get(); + + try { + // Create another loader. + ldr = g1.dataStreamer("UNKNOWN_CACHE"); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + ldr.close(true); + + assert ldr.future().isDone(); + + ldr.future().get(); + + // Create another loader. + ldr = g1.dataStreamer(null); + + // Cancel with future. + ldr.future().cancel(); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + try { + ldr.future().get(); + + assert false; + } + catch (IgniteFutureCancelledException e) { + info("Caught expected exception: " + e); + } + + // Create another loader. + ldr = g1.dataStreamer(null); + + // This will close loader. + stopGrid(getTestGridName(1), false); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + ldr.future().get(); + } + finally { + stopAllGrids(); + } + } + + /** + * Wraps integer to closure returning it. + * + * @param i Value to wrap. + * @return Callable. + */ + private static Callable<Integer> callable(@Nullable final Integer i) { + return new Callable<Integer>() { + @Override public Integer call() throws Exception { + return i; + } + }; + } + + /** + * Wraps integer to closure returning it. + * + * @param i Value to wrap. + * @return Closure. + */ + private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) { + return new IgniteClosure<Integer, Integer>() { + @Override public Integer apply(Integer e) { + return e == null ? i : e + i; + } + }; + } + + /** + * Wraps object to closure returning it. + * + * @param obj Value to wrap. + * @return Closure. + */ + private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) { + return new IgniteClosure<T, T>() { + @Override public T apply(T e) { + assert e == null || obj == null || e.getClass() == obj.getClass() : + "Expects the same types [e=" + e + ", obj=" + obj + ']'; + + return obj; + } + }; + } + + /** + * Wraps integer to closure expecting it and returning {@code null}. + * + * @param exp Expected closure value. + * @return Remove expected cache value closure. + */ + private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) { + return new IgniteClosure<T, T>() { + @Override public T apply(T act) { + if (exp == null ? act == null : exp.equals(act)) + return null; + + throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']'); + } + }; + } + + /** + * @throws Exception If failed. + */ + public void testFlush() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + final IgniteCache<Integer, Integer> c = g.jcache(null); + + final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); + + ldr.perNodeBufferSize(10); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.localSize() == 0); + + multithreaded(new Callable<Void>() { + @Override + public Void call() throws Exception { + ldr.flush(); + + assertEquals(9, c.size()); + + return null; + } + }, 5, "flush-checker"); + + ldr.addData(100, 100); + + ldr.flush(); + + assertEquals(10, c.size()); + + ldr.addData(200, 200); + + ldr.close(false); + + ldr.future().get(); + + assertEquals(11, c.size()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testTryFlush() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + IgniteCache<Integer, Integer> c = g.jcache(null); + + IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); + + ldr.perNodeBufferSize(10); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.localSize() == 0); + + ldr.tryFlush(); + + Thread.sleep(100); + + assertEquals(9, c.size()); + + ldr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testFlushTimeout() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + final CountDownLatch latch = new CountDownLatch(9); + + g.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + latch.countDown(); + + return true; + } + }, EVT_CACHE_OBJECT_PUT); + + IgniteCache<Integer, Integer> c = g.jcache(null); + + assertTrue(c.localSize() == 0); + + IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); + + ldr.perNodeBufferSize(10); + ldr.autoFlushFrequency(3000); + ldr.allowOverwrite(true); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.localSize() == 0); + + assertFalse(latch.await(1000, MILLISECONDS)); + + assertTrue(c.localSize() == 0); + + assertTrue(latch.await(3000, MILLISECONDS)); + + assertEquals(9, c.size()); + + ldr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testUpdateStore() throws Exception { + storeMap = new ConcurrentHashMap<>(); + + try { + store = new TestStore(); + + useCache = true; + + Ignite ignite = startGrid(1); + + startGrid(2); + startGrid(3); + + for (int i = 0; i < 1000; i++) + storeMap.put(i, i); + + try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) { + ldr.allowOverwrite(true); + + assertFalse(ldr.skipStore()); + + for (int i = 0; i < 1000; i++) + ldr.removeData(i); + + for (int i = 1000; i < 2000; i++) + ldr.addData(i, i); + } + + for (int i = 0; i < 1000; i++) + assertNull(storeMap.get(i)); + + for (int i = 1000; i < 2000; i++) + assertEquals(i, storeMap.get(i)); + + try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) { + ldr.allowOverwrite(true); + + ldr.skipStore(true); + + for (int i = 0; i < 1000; i++) + ldr.addData(i, i); + + for (int i = 1000; i < 2000; i++) + ldr.removeData(i); + } + + IgniteCache<Object, Object> cache = ignite.jcache(null); + + for (int i = 0; i < 1000; i++) { + assertNull(storeMap.get(i)); + + assertEquals(i, cache.get(i)); + } + + for (int i = 1000; i < 2000; i++) { + assertEquals(i, storeMap.get(i)); + + assertNull(cache.localPeek(i, CachePeekMode.ONHEAP)); + } + } + finally { + storeMap = null; + } + } + + /** + * @throws Exception If failed. + */ + public void testCustomUserUpdater() throws Exception { + useCache = true; + + try { + Ignite ignite = startGrid(1); + + startGrid(2); + startGrid(3); + + try (IgniteDataStreamer<String, TestObject> ldr = ignite.dataStreamer(null)) { + ldr.allowOverwrite(true); + + ldr.updater(new IgniteDataStreamer.Updater<String, TestObject>() { + @Override public void update(IgniteCache<String, TestObject> cache, + Collection<Map.Entry<String, TestObject>> entries) { + for (Map.Entry<String, TestObject> e : entries) { + assertTrue(e.getKey() instanceof String); + assertTrue(e.getValue() instanceof TestObject); + + cache.put(e.getKey(), new TestObject(e.getValue().val + 1)); + } + } + }); + + for (int i = 0; i < 100; i++) + ldr.addData(String.valueOf(i), new TestObject(i)); + } + + IgniteCache<String, TestObject> cache = ignite.jcache(null); + + for (int i = 0; i < 100; i++) { + TestObject val = cache.get(String.valueOf(i)); + + assertNotNull(val); + assertEquals(i + 1, val.val); + } + } + finally { + stopAllGrids(); + } + } + + /** + * + */ + private static class TestObject { + /** Value. */ + private final int val; + + /** + * @param val Value. + */ + private TestObject(int val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestObject obj = (TestObject)o; + + return val == obj.val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + } + + /** + * + */ + private static class TestStore extends CacheStoreAdapter<Object, Object> { + /** {@inheritDoc} */ + @Nullable @Override public Object load(Object key) { + return storeMap.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> entry) { + storeMap.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + storeMap.remove(key); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java new file mode 100644 index 0000000..3d100e1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jdk8.backport.*; + +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.events.EventType.*; + +/** + * Data streamer performance test. Compares group lock data streamer to traditional lock. + * <p> + * Disable assertions and give at least 2 GB heap to run this test. + */ +public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int GRID_CNT = 3; + + /** */ + private static final int ENTRY_CNT = 80000; + + /** */ + private boolean useCache; + + /** */ + private String[] vals = new String[2048]; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(spi); + + cfg.setIncludeProperties(); + + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + cfg.setConnectorConfiguration(null); + + cfg.setPeerClassLoadingEnabled(true); + + if (useCache) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + + cc.setDistributionMode(PARTITIONED_ONLY); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setStartSize(ENTRY_CNT / GRID_CNT); + cc.setSwapEnabled(false); + + cc.setBackups(1); + + cfg.setCacheSanityCheckEnabled(false); + cfg.setCacheConfiguration(cc); + } + else + cfg.setCacheConfiguration(); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + for (int i = 0; i < vals.length; i++) { + int valLen = ThreadLocalRandom8.current().nextInt(128, 512); + + StringBuilder sb = new StringBuilder(); + + for (int j = 0; j < valLen; j++) + sb.append('a' + ThreadLocalRandom8.current().nextInt(20)); + + vals[i] = sb.toString(); + + info("Value: " + vals[i]); + } + } + + /** + * @throws Exception If failed. + */ + public void testPerformance() throws Exception { + doTest(); + } + + /** + * @throws Exception If failed. + */ + private void doTest() throws Exception { + System.gc(); + System.gc(); + System.gc(); + + try { + useCache = true; + + startGridsMultiThreaded(GRID_CNT); + + useCache = false; + + Ignite ignite = startGrid(); + + final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null); + + ldr.perNodeBufferSize(8192); + ldr.updater(DataStreamerCacheUpdaters.<Integer, String>batchedSorted()); + ldr.autoFlushFrequency(0); + + final LongAdder cnt = new LongAdder(); + + long start = U.currentTimeMillis(); + + Thread t = new Thread(new Runnable() { + @SuppressWarnings("BusyWait") + @Override public void run() { + while (true) { + try { + Thread.sleep(10000); + } + catch (InterruptedException ignored) { + break; + } + + info(">>> Adds/sec: " + cnt.sumThenReset() / 10); + } + } + }); + + t.setDaemon(true); + + t.start(); + + int threadNum = 2;//Runtime.getRuntime().availableProcessors(); + + multithreaded(new Callable<Object>() { + @SuppressWarnings("InfiniteLoopStatement") + @Override public Object call() throws Exception { + ThreadLocalRandom8 rnd = ThreadLocalRandom8.current(); + + while (true) { + int i = rnd.nextInt(ENTRY_CNT); + + ldr.addData(i, vals[rnd.nextInt(vals.length)]); + + cnt.increment(); + } + } + }, threadNum, "loader"); + + info("Closing loader..."); + + ldr.close(false); + + long duration = U.currentTimeMillis() - start; + + info("Finished performance test. Duration: " + duration + "ms."); + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 96e041c..f34f101 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -32,7 +32,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.*; import org.apache.ignite.internal.processors.cache.distributed.replicated.preloader.*; import org.apache.ignite.internal.processors.cache.integration.*; import org.apache.ignite.internal.processors.cache.local.*; -import org.apache.ignite.internal.processors.datastream.*; +import org.apache.ignite.internal.processors.datastreamer.*; /** * Test suite.