http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessorSelfTest.java new file mode 100644 index 0000000..170875c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessorSelfTest.java @@ -0,0 +1,970 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastream; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class DataStreamerProcessorSelfTest extends GridCommonAbstractTest { + /** */ + private static ConcurrentHashMap<Object, Object> storeMap; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private CacheMode mode = PARTITIONED; + + /** */ + private boolean nearEnabled = true; + + /** */ + private boolean useCache; + + /** */ + private TestStore store; + + /** {@inheritDoc} */ + @Override public void afterTest() throws Exception { + super.afterTest(); + + useCache = false; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "unchecked"}) + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + cfg.setIncludeProperties(); + + cfg.setMarshaller(new OptimizedMarshaller(false)); + + if (useCache) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(mode); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); + cc.setWriteSynchronizationMode(FULL_SYNC); + + cc.setEvictSynchronized(false); + cc.setEvictNearSynchronized(false); + + if (store != null) { + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + } + + cfg.setCacheConfiguration(cc); + } + else + cfg.setCacheConfiguration(); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPartitioned() throws Exception { + mode = PARTITIONED; + + checkDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + public void testColocated() throws Exception { + mode = PARTITIONED; + nearEnabled = false; + + checkDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicated() throws Exception { + mode = REPLICATED; + + checkDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + public void testLocal() throws Exception { + mode = LOCAL; + + try { + checkDataStreamer(); + + assert false; + } + catch (IgniteCheckedException e) { + // Cannot load local cache configured remotely. + info("Caught expected exception: " + e); + } + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("ErrorNotRethrown") + private void checkDataStreamer() throws Exception { + try { + Ignite g1 = startGrid(1); + + useCache = true; + + Ignite g2 = startGrid(2); + startGrid(3); + + final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null); + + ldr.updater(DataStreamerCacheUpdaters.<Integer, Integer>batchedSorted()); + + final AtomicInteger idxGen = new AtomicInteger(); + final int cnt = 400; + final int threads = 10; + + final CountDownLatch l1 = new CountDownLatch(threads); + + IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) { + int idx = idxGen.getAndIncrement(); + + futs.add(ldr.addData(idx, idx)); + } + + l1.countDown(); + + for (IgniteFuture<?> fut : futs) + fut.get(); + + return null; + } + }, threads); + + l1.await(); + + // This will wait until data streamer finishes loading. + stopGrid(getTestGridName(1), false); + + f1.get(); + + int s2 = internalCache(2).primaryKeySet().size(); + int s3 = internalCache(3).primaryKeySet().size(); + int total = threads * cnt; + + assertEquals(total, s2 + s3); + + final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null); + + rmvLdr.updater(DataStreamerCacheUpdaters.<Integer, Integer>batchedSorted()); + + final CountDownLatch l2 = new CountDownLatch(threads); + + IgniteInternalFuture<?> f2 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) { + final int key = idxGen.decrementAndGet(); + + futs.add(rmvLdr.removeData(key)); + } + + l2.countDown(); + + for (IgniteFuture<?> fut : futs) + fut.get(); + + return null; + } + }, threads); + + l2.await(); + + rmvLdr.close(false); + + f2.get(); + + s2 = internalCache(2).primaryKeySet().size(); + s3 = internalCache(3).primaryKeySet().size(); + + assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']'; + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedIsolated() throws Exception { + mode = PARTITIONED; + + checkIsolatedDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedIsolated() throws Exception { + mode = REPLICATED; + + checkIsolatedDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + private void checkIsolatedDataStreamer() throws Exception { + try { + useCache = true; + + Ignite g1 = startGrid(0); + startGrid(1); + startGrid(2); + + awaitPartitionMapExchange(); + + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); + + for (int i = 0; i < 100; i++) + cache.put(i, -1); + + final int cnt = 40_000; + final int threads = 10; + + try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null)) { + final AtomicInteger idxGen = new AtomicInteger(); + + IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < cnt; i++) { + int idx = idxGen.getAndIncrement(); + + ldr.addData(idx, idx); + } + + return null; + } + }, threads); + + f1.get(); + } + + for (int g = 0; g < 3; g++) { + ClusterNode locNode = grid(g).localNode(); + + GridCacheAdapter<Integer, Integer> cache0 = ((IgniteKernal)grid(g)).internalCache(null); + + if (cache0.isNear()) + cache0 = ((GridNearCacheAdapter<Integer, Integer>)cache0).dht(); + + CacheAffinity<Integer> aff = cache0.affinity(); + + for (int key = 0; key < cnt * threads; key++) { + if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) { + GridCacheEntryEx entry = cache0.peekEx(key); + + assertNotNull("Missing entry for key: " + key, entry); + assertEquals((key < 100 ? -1 : key), + CU.value(entry.rawGetOrUnmarshal(false), cache0.context(), false)); + } + } + } + } + finally { + stopAllGrids(); + } + } + + /** + * Test primitive arrays can be passed into data streamer. + * + * @throws Exception If failed. + */ + public void testPrimitiveArrays() throws Exception { + try { + useCache = true; + mode = PARTITIONED; + + Ignite g1 = startGrid(1); + startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used). + + List<Object> arrays = Arrays.<Object>asList( + new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4}, + new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8}); + + IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(null); + + for (int i = 0, size = arrays.size(); i < 1000; i++) { + Object arr = arrays.get(i % size); + + dataLdr.addData(i, arr); + dataLdr.addData(i, fixedClosure(arr)); + } + + dataLdr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedMultiThreaded() throws Exception { + mode = REPLICATED; + + checkLoaderMultithreaded(1, 2); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedMultiThreaded() throws Exception { + mode = PARTITIONED; + + checkLoaderMultithreaded(1, 3); + } + + /** + * Tests loader in multithreaded environment with various count of grids started. + * + * @param nodesCntNoCache How many nodes should be started without cache. + * @param nodesCntCache How many nodes should be started with cache. + * @throws Exception If failed. + */ + protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache) + throws Exception { + try { + // Start all required nodes. + int idx = 1; + + for (int i = 0; i < nodesCntNoCache; i++) + startGrid(idx++); + + useCache = true; + + for (int i = 0; i < nodesCntCache; i++) + startGrid(idx++); + + Ignite g1 = grid(1); + + // Get and configure loader. + final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null); + + ldr.updater(DataStreamerCacheUpdaters.<Integer, Integer>individual()); + ldr.perNodeBufferSize(2); + + // Define count of puts. + final AtomicInteger idxGen = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + try { + final int totalPutCnt = 50000; + + IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Collection<IgniteFuture<?>> futs = new ArrayList<>(); + + while (!done.get()) { + int idx = idxGen.getAndIncrement(); + + if (idx >= totalPutCnt) { + info(">>> Stopping producer thread since maximum count of puts reached."); + + break; + } + + futs.add(ldr.addData(idx, idx)); + } + + ldr.flush(); + + for (IgniteFuture<?> fut : futs) + fut.get(); + + return null; + } + }, 5, "producer"); + + IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!done.get()) { + ldr.flush(); + + U.sleep(100); + } + + return null; + } + }, 1, "flusher"); + + // Define index of node being restarted. + final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1; + + IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + for (int i = 0; i < 5; i++) { + Ignite g = startGrid(restartNodeIdx); + + UUID id = g.cluster().localNode().id(); + + info(">>>>>>> Started node: " + id); + + U.sleep(1000); + + stopGrid(getTestGridName(restartNodeIdx), true); + + info(">>>>>>> Stopped node: " + id); + } + } + finally { + done.set(true); + + info("Start stop thread finished."); + } + + return null; + } + }, 1, "start-stop-thread"); + + fut1.get(); + fut2.get(); + fut3.get(); + } + finally { + ldr.close(false); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testLoaderApi() throws Exception { + useCache = true; + + try { + Ignite g1 = startGrid(1); + + IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null); + + ldr.close(false); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + ldr.future().get(); + + try { + // Create another loader. + ldr = g1.dataStreamer("UNKNOWN_CACHE"); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + ldr.close(true); + + assert ldr.future().isDone(); + + ldr.future().get(); + + // Create another loader. + ldr = g1.dataStreamer(null); + + // Cancel with future. + ldr.future().cancel(); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + try { + ldr.future().get(); + + assert false; + } + catch (IgniteFutureCancelledException e) { + info("Caught expected exception: " + e); + } + + // Create another loader. + ldr = g1.dataStreamer(null); + + // This will close loader. + stopGrid(getTestGridName(1), false); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + ldr.future().get(); + } + finally { + stopAllGrids(); + } + } + + /** + * Wraps integer to closure returning it. + * + * @param i Value to wrap. + * @return Callable. + */ + private static Callable<Integer> callable(@Nullable final Integer i) { + return new Callable<Integer>() { + @Override public Integer call() throws Exception { + return i; + } + }; + } + + /** + * Wraps integer to closure returning it. + * + * @param i Value to wrap. + * @return Closure. + */ + private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) { + return new IgniteClosure<Integer, Integer>() { + @Override public Integer apply(Integer e) { + return e == null ? i : e + i; + } + }; + } + + /** + * Wraps object to closure returning it. + * + * @param obj Value to wrap. + * @return Closure. + */ + private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) { + return new IgniteClosure<T, T>() { + @Override public T apply(T e) { + assert e == null || obj == null || e.getClass() == obj.getClass() : + "Expects the same types [e=" + e + ", obj=" + obj + ']'; + + return obj; + } + }; + } + + /** + * Wraps integer to closure expecting it and returning {@code null}. + * + * @param exp Expected closure value. + * @return Remove expected cache value closure. + */ + private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) { + return new IgniteClosure<T, T>() { + @Override public T apply(T act) { + if (exp == null ? act == null : exp.equals(act)) + return null; + + throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']'); + } + }; + } + + /** + * @throws Exception If failed. + */ + public void testFlush() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + final IgniteCache<Integer, Integer> c = g.jcache(null); + + final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); + + ldr.perNodeBufferSize(10); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.localSize() == 0); + + multithreaded(new Callable<Void>() { + @Override + public Void call() throws Exception { + ldr.flush(); + + assertEquals(9, c.size()); + + return null; + } + }, 5, "flush-checker"); + + ldr.addData(100, 100); + + ldr.flush(); + + assertEquals(10, c.size()); + + ldr.addData(200, 200); + + ldr.close(false); + + ldr.future().get(); + + assertEquals(11, c.size()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testTryFlush() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + IgniteCache<Integer, Integer> c = g.jcache(null); + + IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); + + ldr.perNodeBufferSize(10); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.localSize() == 0); + + ldr.tryFlush(); + + Thread.sleep(100); + + assertEquals(9, c.size()); + + ldr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testFlushTimeout() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + final CountDownLatch latch = new CountDownLatch(9); + + g.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + latch.countDown(); + + return true; + } + }, EVT_CACHE_OBJECT_PUT); + + IgniteCache<Integer, Integer> c = g.jcache(null); + + assertTrue(c.localSize() == 0); + + IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); + + ldr.perNodeBufferSize(10); + ldr.autoFlushFrequency(3000); + ldr.allowOverwrite(true); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.localSize() == 0); + + assertFalse(latch.await(1000, MILLISECONDS)); + + assertTrue(c.localSize() == 0); + + assertTrue(latch.await(3000, MILLISECONDS)); + + assertEquals(9, c.size()); + + ldr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testUpdateStore() throws Exception { + storeMap = new ConcurrentHashMap<>(); + + try { + store = new TestStore(); + + useCache = true; + + Ignite ignite = startGrid(1); + + startGrid(2); + startGrid(3); + + for (int i = 0; i < 1000; i++) + storeMap.put(i, i); + + try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) { + ldr.allowOverwrite(true); + + assertFalse(ldr.skipStore()); + + for (int i = 0; i < 1000; i++) + ldr.removeData(i); + + for (int i = 1000; i < 2000; i++) + ldr.addData(i, i); + } + + for (int i = 0; i < 1000; i++) + assertNull(storeMap.get(i)); + + for (int i = 1000; i < 2000; i++) + assertEquals(i, storeMap.get(i)); + + try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) { + ldr.allowOverwrite(true); + + ldr.skipStore(true); + + for (int i = 0; i < 1000; i++) + ldr.addData(i, i); + + for (int i = 1000; i < 2000; i++) + ldr.removeData(i); + } + + IgniteCache<Object, Object> cache = ignite.jcache(null); + + for (int i = 0; i < 1000; i++) { + assertNull(storeMap.get(i)); + + assertEquals(i, cache.get(i)); + } + + for (int i = 1000; i < 2000; i++) { + assertEquals(i, storeMap.get(i)); + + assertNull(cache.localPeek(i, CachePeekMode.ONHEAP)); + } + } + finally { + storeMap = null; + } + } + + /** + * @throws Exception If failed. + */ + public void testCustomUserUpdater() throws Exception { + useCache = true; + + try { + Ignite ignite = startGrid(1); + + startGrid(2); + startGrid(3); + + try (IgniteDataStreamer<String, TestObject> ldr = ignite.dataStreamer(null)) { + ldr.allowOverwrite(true); + + ldr.updater(new IgniteDataStreamer.Updater<String, TestObject>() { + @Override public void update(IgniteCache<String, TestObject> cache, + Collection<Map.Entry<String, TestObject>> entries) { + for (Map.Entry<String, TestObject> e : entries) { + assertTrue(e.getKey() instanceof String); + assertTrue(e.getValue() instanceof TestObject); + + cache.put(e.getKey(), new TestObject(e.getValue().val + 1)); + } + } + }); + + for (int i = 0; i < 100; i++) + ldr.addData(String.valueOf(i), new TestObject(i)); + } + + IgniteCache<String, TestObject> cache = ignite.jcache(null); + + for (int i = 0; i < 100; i++) { + TestObject val = cache.get(String.valueOf(i)); + + assertNotNull(val); + assertEquals(i + 1, val.val); + } + } + finally { + stopAllGrids(); + } + } + + /** + * + */ + private static class TestObject { + /** Value. */ + private final int val; + + /** + * @param val Value. + */ + private TestObject(int val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestObject obj = (TestObject)o; + + return val == obj.val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + } + + /** + * + */ + private static class TestStore extends CacheStoreAdapter<Object, Object> { + /** {@inheritDoc} */ + @Nullable @Override public Object load(Object key) { + return storeMap.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> entry) { + storeMap.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + storeMap.remove(key); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java deleted file mode 100644 index f8f59df..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.datastream; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; - -/** - * Tests for {@code IgniteDataStreamerImpl}. - */ -public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Number of keys to load via data streamer. */ - private static final int KEYS_COUNT = 1000; - - /** Started grid counter. */ - private static int cnt; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - // Forth node goes without cache. - if (cnt < 4) - cfg.setCacheConfiguration(cacheConfiguration()); - - cnt++; - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testNullPointerExceptionUponDataStreamerClosing() throws Exception { - try { - startGrids(5); - - final CyclicBarrier barrier = new CyclicBarrier(2); - - multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - U.awaitQuiet(barrier); - - G.stopAll(true); - - return null; - } - }, 1); - - Ignite g4 = grid(4); - - IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null); - - dataLdr.perNodeBufferSize(32); - - for (int i = 0; i < 100000; i += 2) { - dataLdr.addData(i, i); - dataLdr.removeData(i + 1); - } - - U.awaitQuiet(barrier); - - info("Closing data streamer."); - - try { - dataLdr.close(true); - } - catch (IllegalStateException ignore) { - // This is ok to ignore this exception as test is racy by it's nature - - // grid is stopping in different thread. - } - } - finally { - G.stopAll(true); - } - } - - /** - * Data streamer should correctly load entries from HashMap in case of grids with more than one node - * and with GridOptimizedMarshaller that requires serializable. - * - * @throws Exception If failed. - */ - public void testAddDataFromMap() throws Exception { - try { - cnt = 0; - - startGrids(2); - - Ignite g0 = grid(0); - - IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null); - - Map<Integer, String> map = U.newHashMap(KEYS_COUNT); - - for (int i = 0; i < KEYS_COUNT; i ++) - map.put(i, String.valueOf(i)); - - dataLdr.addData(map); - - dataLdr.close(); - - Random rnd = new Random(); - - IgniteCache<Integer, String> c = g0.jcache(null); - - for (int i = 0; i < KEYS_COUNT; i ++) { - Integer k = rnd.nextInt(KEYS_COUNT); - - String v = c.get(k); - - assertEquals(k.toString(), v); - } - } - finally { - G.stopAll(true); - } - } - - /** - * Gets cache configuration. - * - * @return Cache configuration. - */ - private CacheConfiguration cacheConfiguration() { - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setBackups(1); - cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - - return cacheCfg; - } - - /** - * - */ - private static class TestObject implements Serializable { - /** */ - private int val; - - /** - */ - private TestObject() { - // No-op. - } - - /** - * @param val Value. - */ - private TestObject(int val) { - this.val = val; - } - - public Integer val() { - return val; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return val; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj instanceof TestObject && ((TestObject)obj).val == val; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java index c27e7c6..053228c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java @@ -138,7 +138,7 @@ public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest { final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null); ldr.perNodeBufferSize(8192); - ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, String>batchedSorted()); + ldr.updater(DataStreamerCacheUpdaters.<Integer, String>batchedSorted()); ldr.autoFlushFrequency(0); final LongAdder cnt = new LongAdder(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java deleted file mode 100644 index 037c55b..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java +++ /dev/null @@ -1,970 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.datastream; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import javax.cache.*; -import javax.cache.configuration.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; -import static org.apache.ignite.events.EventType.*; - -/** - * - */ -public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest { - /** */ - private static ConcurrentHashMap<Object, Object> storeMap; - - /** */ - private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** */ - private CacheMode mode = PARTITIONED; - - /** */ - private boolean nearEnabled = true; - - /** */ - private boolean useCache; - - /** */ - private TestStore store; - - /** {@inheritDoc} */ - @Override public void afterTest() throws Exception { - super.afterTest(); - - useCache = false; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"IfMayBeConditional", "unchecked"}) - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(spi); - - cfg.setIncludeProperties(); - - cfg.setMarshaller(new OptimizedMarshaller(false)); - - if (useCache) { - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(mode); - cc.setAtomicityMode(TRANSACTIONAL); - cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); - cc.setWriteSynchronizationMode(FULL_SYNC); - - cc.setEvictSynchronized(false); - cc.setEvictNearSynchronized(false); - - if (store != null) { - cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); - cc.setReadThrough(true); - cc.setWriteThrough(true); - } - - cfg.setCacheConfiguration(cc); - } - else - cfg.setCacheConfiguration(); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testPartitioned() throws Exception { - mode = PARTITIONED; - - checkDataStreamer(); - } - - /** - * @throws Exception If failed. - */ - public void testColocated() throws Exception { - mode = PARTITIONED; - nearEnabled = false; - - checkDataStreamer(); - } - - /** - * @throws Exception If failed. - */ - public void testReplicated() throws Exception { - mode = REPLICATED; - - checkDataStreamer(); - } - - /** - * @throws Exception If failed. - */ - public void testLocal() throws Exception { - mode = LOCAL; - - try { - checkDataStreamer(); - - assert false; - } - catch (IgniteCheckedException e) { - // Cannot load local cache configured remotely. - info("Caught expected exception: " + e); - } - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings("ErrorNotRethrown") - private void checkDataStreamer() throws Exception { - try { - Ignite g1 = startGrid(1); - - useCache = true; - - Ignite g2 = startGrid(2); - startGrid(3); - - final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null); - - ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted()); - - final AtomicInteger idxGen = new AtomicInteger(); - final int cnt = 400; - final int threads = 10; - - final CountDownLatch l1 = new CountDownLatch(threads); - - IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - int idx = idxGen.getAndIncrement(); - - futs.add(ldr.addData(idx, idx)); - } - - l1.countDown(); - - for (IgniteFuture<?> fut : futs) - fut.get(); - - return null; - } - }, threads); - - l1.await(); - - // This will wait until data streamer finishes loading. - stopGrid(getTestGridName(1), false); - - f1.get(); - - int s2 = internalCache(2).primaryKeySet().size(); - int s3 = internalCache(3).primaryKeySet().size(); - int total = threads * cnt; - - assertEquals(total, s2 + s3); - - final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null); - - rmvLdr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted()); - - final CountDownLatch l2 = new CountDownLatch(threads); - - IgniteInternalFuture<?> f2 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - final int key = idxGen.decrementAndGet(); - - futs.add(rmvLdr.removeData(key)); - } - - l2.countDown(); - - for (IgniteFuture<?> fut : futs) - fut.get(); - - return null; - } - }, threads); - - l2.await(); - - rmvLdr.close(false); - - f2.get(); - - s2 = internalCache(2).primaryKeySet().size(); - s3 = internalCache(3).primaryKeySet().size(); - - assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']'; - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testPartitionedIsolated() throws Exception { - mode = PARTITIONED; - - checkIsolatedDataStreamer(); - } - - /** - * @throws Exception If failed. - */ - public void testReplicatedIsolated() throws Exception { - mode = REPLICATED; - - checkIsolatedDataStreamer(); - } - - /** - * @throws Exception If failed. - */ - private void checkIsolatedDataStreamer() throws Exception { - try { - useCache = true; - - Ignite g1 = startGrid(0); - startGrid(1); - startGrid(2); - - awaitPartitionMapExchange(); - - IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - - for (int i = 0; i < 100; i++) - cache.put(i, -1); - - final int cnt = 40_000; - final int threads = 10; - - try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null)) { - final AtomicInteger idxGen = new AtomicInteger(); - - IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - for (int i = 0; i < cnt; i++) { - int idx = idxGen.getAndIncrement(); - - ldr.addData(idx, idx); - } - - return null; - } - }, threads); - - f1.get(); - } - - for (int g = 0; g < 3; g++) { - ClusterNode locNode = grid(g).localNode(); - - GridCacheAdapter<Integer, Integer> cache0 = ((IgniteKernal)grid(g)).internalCache(null); - - if (cache0.isNear()) - cache0 = ((GridNearCacheAdapter<Integer, Integer>)cache0).dht(); - - CacheAffinity<Integer> aff = cache0.affinity(); - - for (int key = 0; key < cnt * threads; key++) { - if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) { - GridCacheEntryEx entry = cache0.peekEx(key); - - assertNotNull("Missing entry for key: " + key, entry); - assertEquals((key < 100 ? -1 : key), - CU.value(entry.rawGetOrUnmarshal(false), cache0.context(), false)); - } - } - } - } - finally { - stopAllGrids(); - } - } - - /** - * Test primitive arrays can be passed into data streamer. - * - * @throws Exception If failed. - */ - public void testPrimitiveArrays() throws Exception { - try { - useCache = true; - mode = PARTITIONED; - - Ignite g1 = startGrid(1); - startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used). - - List<Object> arrays = Arrays.<Object>asList( - new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4}, - new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8}); - - IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(null); - - for (int i = 0, size = arrays.size(); i < 1000; i++) { - Object arr = arrays.get(i % size); - - dataLdr.addData(i, arr); - dataLdr.addData(i, fixedClosure(arr)); - } - - dataLdr.close(false); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testReplicatedMultiThreaded() throws Exception { - mode = REPLICATED; - - checkLoaderMultithreaded(1, 2); - } - - /** - * @throws Exception If failed. - */ - public void testPartitionedMultiThreaded() throws Exception { - mode = PARTITIONED; - - checkLoaderMultithreaded(1, 3); - } - - /** - * Tests loader in multithreaded environment with various count of grids started. - * - * @param nodesCntNoCache How many nodes should be started without cache. - * @param nodesCntCache How many nodes should be started with cache. - * @throws Exception If failed. - */ - protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache) - throws Exception { - try { - // Start all required nodes. - int idx = 1; - - for (int i = 0; i < nodesCntNoCache; i++) - startGrid(idx++); - - useCache = true; - - for (int i = 0; i < nodesCntCache; i++) - startGrid(idx++); - - Ignite g1 = grid(1); - - // Get and configure loader. - final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null); - - ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>individual()); - ldr.perNodeBufferSize(2); - - // Define count of puts. - final AtomicInteger idxGen = new AtomicInteger(); - - final AtomicBoolean done = new AtomicBoolean(); - - try { - final int totalPutCnt = 50000; - - IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Collection<IgniteFuture<?>> futs = new ArrayList<>(); - - while (!done.get()) { - int idx = idxGen.getAndIncrement(); - - if (idx >= totalPutCnt) { - info(">>> Stopping producer thread since maximum count of puts reached."); - - break; - } - - futs.add(ldr.addData(idx, idx)); - } - - ldr.flush(); - - for (IgniteFuture<?> fut : futs) - fut.get(); - - return null; - } - }, 5, "producer"); - - IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (!done.get()) { - ldr.flush(); - - U.sleep(100); - } - - return null; - } - }, 1, "flusher"); - - // Define index of node being restarted. - final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1; - - IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - try { - for (int i = 0; i < 5; i++) { - Ignite g = startGrid(restartNodeIdx); - - UUID id = g.cluster().localNode().id(); - - info(">>>>>>> Started node: " + id); - - U.sleep(1000); - - stopGrid(getTestGridName(restartNodeIdx), true); - - info(">>>>>>> Stopped node: " + id); - } - } - finally { - done.set(true); - - info("Start stop thread finished."); - } - - return null; - } - }, 1, "start-stop-thread"); - - fut1.get(); - fut2.get(); - fut3.get(); - } - finally { - ldr.close(false); - } - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testLoaderApi() throws Exception { - useCache = true; - - try { - Ignite g1 = startGrid(1); - - IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null); - - ldr.close(false); - - try { - ldr.addData(0, 0); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - assert ldr.future().isDone(); - - ldr.future().get(); - - try { - // Create another loader. - ldr = g1.dataStreamer("UNKNOWN_CACHE"); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - ldr.close(true); - - assert ldr.future().isDone(); - - ldr.future().get(); - - // Create another loader. - ldr = g1.dataStreamer(null); - - // Cancel with future. - ldr.future().cancel(); - - try { - ldr.addData(0, 0); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - assert ldr.future().isDone(); - - try { - ldr.future().get(); - - assert false; - } - catch (IgniteFutureCancelledException e) { - info("Caught expected exception: " + e); - } - - // Create another loader. - ldr = g1.dataStreamer(null); - - // This will close loader. - stopGrid(getTestGridName(1), false); - - try { - ldr.addData(0, 0); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - assert ldr.future().isDone(); - - ldr.future().get(); - } - finally { - stopAllGrids(); - } - } - - /** - * Wraps integer to closure returning it. - * - * @param i Value to wrap. - * @return Callable. - */ - private static Callable<Integer> callable(@Nullable final Integer i) { - return new Callable<Integer>() { - @Override public Integer call() throws Exception { - return i; - } - }; - } - - /** - * Wraps integer to closure returning it. - * - * @param i Value to wrap. - * @return Closure. - */ - private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) { - return new IgniteClosure<Integer, Integer>() { - @Override public Integer apply(Integer e) { - return e == null ? i : e + i; - } - }; - } - - /** - * Wraps object to closure returning it. - * - * @param obj Value to wrap. - * @return Closure. - */ - private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) { - return new IgniteClosure<T, T>() { - @Override public T apply(T e) { - assert e == null || obj == null || e.getClass() == obj.getClass() : - "Expects the same types [e=" + e + ", obj=" + obj + ']'; - - return obj; - } - }; - } - - /** - * Wraps integer to closure expecting it and returning {@code null}. - * - * @param exp Expected closure value. - * @return Remove expected cache value closure. - */ - private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) { - return new IgniteClosure<T, T>() { - @Override public T apply(T act) { - if (exp == null ? act == null : exp.equals(act)) - return null; - - throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']'); - } - }; - } - - /** - * @throws Exception If failed. - */ - public void testFlush() throws Exception { - mode = LOCAL; - - useCache = true; - - try { - Ignite g = startGrid(); - - final IgniteCache<Integer, Integer> c = g.jcache(null); - - final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); - - ldr.perNodeBufferSize(10); - - for (int i = 0; i < 9; i++) - ldr.addData(i, i); - - assertTrue(c.localSize() == 0); - - multithreaded(new Callable<Void>() { - @Override - public Void call() throws Exception { - ldr.flush(); - - assertEquals(9, c.size()); - - return null; - } - }, 5, "flush-checker"); - - ldr.addData(100, 100); - - ldr.flush(); - - assertEquals(10, c.size()); - - ldr.addData(200, 200); - - ldr.close(false); - - ldr.future().get(); - - assertEquals(11, c.size()); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testTryFlush() throws Exception { - mode = LOCAL; - - useCache = true; - - try { - Ignite g = startGrid(); - - IgniteCache<Integer, Integer> c = g.jcache(null); - - IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); - - ldr.perNodeBufferSize(10); - - for (int i = 0; i < 9; i++) - ldr.addData(i, i); - - assertTrue(c.localSize() == 0); - - ldr.tryFlush(); - - Thread.sleep(100); - - assertEquals(9, c.size()); - - ldr.close(false); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testFlushTimeout() throws Exception { - mode = LOCAL; - - useCache = true; - - try { - Ignite g = startGrid(); - - final CountDownLatch latch = new CountDownLatch(9); - - g.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - latch.countDown(); - - return true; - } - }, EVT_CACHE_OBJECT_PUT); - - IgniteCache<Integer, Integer> c = g.jcache(null); - - assertTrue(c.localSize() == 0); - - IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); - - ldr.perNodeBufferSize(10); - ldr.autoFlushFrequency(3000); - ldr.allowOverwrite(true); - - for (int i = 0; i < 9; i++) - ldr.addData(i, i); - - assertTrue(c.localSize() == 0); - - assertFalse(latch.await(1000, MILLISECONDS)); - - assertTrue(c.localSize() == 0); - - assertTrue(latch.await(3000, MILLISECONDS)); - - assertEquals(9, c.size()); - - ldr.close(false); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testUpdateStore() throws Exception { - storeMap = new ConcurrentHashMap<>(); - - try { - store = new TestStore(); - - useCache = true; - - Ignite ignite = startGrid(1); - - startGrid(2); - startGrid(3); - - for (int i = 0; i < 1000; i++) - storeMap.put(i, i); - - try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) { - ldr.allowOverwrite(true); - - assertFalse(ldr.skipStore()); - - for (int i = 0; i < 1000; i++) - ldr.removeData(i); - - for (int i = 1000; i < 2000; i++) - ldr.addData(i, i); - } - - for (int i = 0; i < 1000; i++) - assertNull(storeMap.get(i)); - - for (int i = 1000; i < 2000; i++) - assertEquals(i, storeMap.get(i)); - - try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) { - ldr.allowOverwrite(true); - - ldr.skipStore(true); - - for (int i = 0; i < 1000; i++) - ldr.addData(i, i); - - for (int i = 1000; i < 2000; i++) - ldr.removeData(i); - } - - IgniteCache<Object, Object> cache = ignite.jcache(null); - - for (int i = 0; i < 1000; i++) { - assertNull(storeMap.get(i)); - - assertEquals(i, cache.get(i)); - } - - for (int i = 1000; i < 2000; i++) { - assertEquals(i, storeMap.get(i)); - - assertNull(cache.localPeek(i, CachePeekMode.ONHEAP)); - } - } - finally { - storeMap = null; - } - } - - /** - * @throws Exception If failed. - */ - public void testCustomUserUpdater() throws Exception { - useCache = true; - - try { - Ignite ignite = startGrid(1); - - startGrid(2); - startGrid(3); - - try (IgniteDataStreamer<String, TestObject> ldr = ignite.dataStreamer(null)) { - ldr.allowOverwrite(true); - - ldr.updater(new IgniteDataStreamer.Updater<String, TestObject>() { - @Override public void update(IgniteCache<String, TestObject> cache, - Collection<Map.Entry<String, TestObject>> entries) { - for (Map.Entry<String, TestObject> e : entries) { - assertTrue(e.getKey() instanceof String); - assertTrue(e.getValue() instanceof TestObject); - - cache.put(e.getKey(), new TestObject(e.getValue().val + 1)); - } - } - }); - - for (int i = 0; i < 100; i++) - ldr.addData(String.valueOf(i), new TestObject(i)); - } - - IgniteCache<String, TestObject> cache = ignite.jcache(null); - - for (int i = 0; i < 100; i++) { - TestObject val = cache.get(String.valueOf(i)); - - assertNotNull(val); - assertEquals(i + 1, val.val); - } - } - finally { - stopAllGrids(); - } - } - - /** - * - */ - private static class TestObject { - /** Value. */ - private final int val; - - /** - * @param val Value. - */ - private TestObject(int val) { - this.val = val; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - TestObject obj = (TestObject)o; - - return val == obj.val; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return val; - } - } - - /** - * - */ - private static class TestStore extends CacheStoreAdapter<Object, Object> { - /** {@inheritDoc} */ - @Nullable @Override public Object load(Object key) { - return storeMap.get(key); - } - - /** {@inheritDoc} */ - @Override public void write(Cache.Entry<?, ?> entry) { - storeMap.put(entry.getKey(), entry.getValue()); - } - - /** {@inheritDoc} */ - @Override public void delete(Object key) { - storeMap.remove(key); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index c20d6dc..96e041c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -30,7 +30,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.distributed.replicated.*; import org.apache.ignite.internal.processors.cache.distributed.replicated.preloader.*; -import org.apache.ignite.internal.processors.cache.expiry.*; import org.apache.ignite.internal.processors.cache.integration.*; import org.apache.ignite.internal.processors.cache.local.*; import org.apache.ignite.internal.processors.datastream.*; @@ -107,8 +106,8 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(GridCacheBalancingStoreSelfTest.class); suite.addTestSuite(GridCacheAffinityApiSelfTest.class); suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class); - suite.addTestSuite(IgniteDataStreamerProcessorSelfTest.class); - suite.addTestSuite(IgniteDataStreamerImplSelfTest.class); + suite.addTestSuite(DataStreamerProcessorSelfTest.class); + suite.addTestSuite(DataStreamerImplSelfTest.class); suite.addTestSuite(GridCacheEntryMemorySizeSelfTest.class); suite.addTestSuite(GridCacheClearAllSelfTest.class); suite.addTestSuite(GridCacheObjectToStringSelfTest.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c832901..ff9e63e 100644 --- a/pom.xml +++ b/pom.xml @@ -757,7 +757,7 @@ </group> <group> <title>Data Grid APIs</title> - <packages>org.apache.ignite.cache:org.apache.ignite.transactions:org.apache.ignite.datastructures:org.apache.ignite.cache.store:org.apache.ignite.cache.store.hibernate:org.apache.ignite.cache.store.jdbc:org.apache.ignite.cache.query:org.apache.ignite.cache.query.annotations:org.apache.ignite.cache.affinity:org.apache.ignite.cache.affinity.consistenthash:org.apache.ignite.cache.affinity.rendezvous:org.apache.ignite.cache.affinity.fair:org.apache.ignite.cache.eviction:org.apache.ignite.cache.eviction.fifo:org.apache.ignite.cache.eviction.igfs:org.apache.ignite.cache.eviction.lru:org.apache.ignite.cache.eviction.random:org.apache.ignite.cache.jta:org.apache.ignite.cache.jta.jndi:org.apache.ignite.cache.jta.reflect:org.apache.ignite.cache.websession:org.apache.ignite.cache.hibernate:org.apache.ignite.dataload</packages> + <packages>org.apache.ignite.cache:org.apache.ignite.transactions:org.apache.ignite.datastructures:org.apache.ignite.cache.store:org.apache.ignite.cache.store.hibernate:org.apache.ignite.cache.store.jdbc:org.apache.ignite.cache.query:org.apache.ignite.cache.query.annotations:org.apache.ignite.cache.affinity:org.apache.ignite.cache.affinity.consistenthash:org.apache.ignite.cache.affinity.rendezvous:org.apache.ignite.cache.affinity.fair:org.apache.ignite.cache.eviction:org.apache.ignite.cache.eviction.fifo:org.apache.ignite.cache.eviction.igfs:org.apache.ignite.cache.eviction.lru:org.apache.ignite.cache.eviction.random:org.apache.ignite.cache.jta:org.apache.ignite.cache.jta.jndi:org.apache.ignite.cache.jta.reflect:org.apache.ignite.cache.websession:org.apache.ignite.cache.hibernate:org.apache.ignite.datastreamer</packages> </group> <group> <title>Service Grid APIs</title> @@ -952,7 +952,7 @@ </group> <group> <title>Data Grid APIs</title> - <packages>org.apache.ignite.cache:org.apache.ignite.transactions:org.apache.ignite.datastructures:org.apache.ignite.cache.store:org.apache.ignite.cache.store.hibernate:org.apache.ignite.cache.store.jdbc:org.apache.ignite.cache.store.jdbc.dialect:org.apache.ignite.cache.query:org.apache.ignite.cache.query.annotations:org.apache.ignite.cache.affinity:org.apache.ignite.cache.affinity.consistenthash:org.apache.ignite.cache.affinity.rendezvous:org.apache.ignite.cache.affinity.fair:org.apache.ignite.cache.eviction:org.apache.ignite.cache.eviction.fifo:org.apache.ignite.cache.eviction.igfs:org.apache.ignite.cache.eviction.lru:org.apache.ignite.cache.eviction.random:org.apache.ignite.cache.jta:org.apache.ignite.cache.jta.jndi:org.apache.ignite.cache.jta.reflect:org.apache.ignite.cache.websession:org.apache.ignite.cache.hibernate:org.apache.ignite.dataload</packages> + <packages>org.apache.ignite.cache:org.apache.ignite.transactions:org.apache.ignite.datastructures:org.apache.ignite.cache.store:org.apache.ignite.cache.store.hibernate:org.apache.ignite.cache.store.jdbc:org.apache.ignite.cache.store.jdbc.dialect:org.apache.ignite.cache.query:org.apache.ignite.cache.query.annotations:org.apache.ignite.cache.affinity:org.apache.ignite.cache.affinity.consistenthash:org.apache.ignite.cache.affinity.rendezvous:org.apache.ignite.cache.affinity.fair:org.apache.ignite.cache.eviction:org.apache.ignite.cache.eviction.fifo:org.apache.ignite.cache.eviction.igfs:org.apache.ignite.cache.eviction.lru:org.apache.ignite.cache.eviction.random:org.apache.ignite.cache.jta:org.apache.ignite.cache.jta.jndi:org.apache.ignite.cache.jta.reflect:org.apache.ignite.cache.websession:org.apache.ignite.cache.hibernate:org.apache.ignite.datastreamer</packages> </group> <group> <title>Service Grid APIs</title>