http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java index 0000000,e6da0d1..1a386cb mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java @@@ -1,0 -1,970 +1,974 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.datastreamer; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.cache.affinity.*; + import org.apache.ignite.cache.store.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.events.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.distributed.near.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.marshaller.optimized.*; + import org.apache.ignite.spi.discovery.tcp.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.testframework.junits.common.*; + import org.jetbrains.annotations.*; + + import javax.cache.*; + import javax.cache.configuration.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static java.util.concurrent.TimeUnit.*; + import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; + import static org.apache.ignite.cache.CacheMode.*; + import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + import static org.apache.ignite.events.EventType.*; + + /** + * + */ + public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { + /** */ + private static ConcurrentHashMap<Object, Object> storeMap; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private CacheMode mode = PARTITIONED; + + /** */ + private boolean nearEnabled = true; + + /** */ + private boolean useCache; + + /** */ + private TestStore store; + + /** {@inheritDoc} */ + @Override public void afterTest() throws Exception { + super.afterTest(); + + useCache = false; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "unchecked"}) + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + cfg.setIncludeProperties(); + + cfg.setMarshaller(new OptimizedMarshaller(false)); + + if (useCache) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(mode); + cc.setAtomicityMode(TRANSACTIONAL); - cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); ++ ++ if (nearEnabled) { ++ NearCacheConfiguration nearCfg = new NearCacheConfiguration(); ++ ++ cc.setNearConfiguration(nearCfg); ++ } ++ + cc.setWriteSynchronizationMode(FULL_SYNC); + + cc.setEvictSynchronized(false); - cc.setEvictNearSynchronized(false); + + if (store != null) { + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + } + + cfg.setCacheConfiguration(cc); + } + else + cfg.setCacheConfiguration(); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPartitioned() throws Exception { + mode = PARTITIONED; + + checkDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + public void testColocated() throws Exception { + mode = PARTITIONED; + nearEnabled = false; + + checkDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicated() throws Exception { + mode = REPLICATED; + + checkDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + public void testLocal() throws Exception { + mode = LOCAL; + + try { + checkDataStreamer(); + + assert false; + } + catch (IgniteCheckedException e) { + // Cannot load local cache configured remotely. + info("Caught expected exception: " + e); + } + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("ErrorNotRethrown") + private void checkDataStreamer() throws Exception { + try { + Ignite g1 = startGrid(1); + + useCache = true; + + Ignite g2 = startGrid(2); + startGrid(3); + + final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null); + + ldr.updater(DataStreamerCacheUpdaters.<Integer, Integer>batchedSorted()); + + final AtomicInteger idxGen = new AtomicInteger(); + final int cnt = 400; + final int threads = 10; + + final CountDownLatch l1 = new CountDownLatch(threads); + + IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) { + int idx = idxGen.getAndIncrement(); + + futs.add(ldr.addData(idx, idx)); + } + + l1.countDown(); + + for (IgniteFuture<?> fut : futs) + fut.get(); + + return null; + } + }, threads); + + l1.await(); + + // This will wait until data streamer finishes loading. + stopGrid(getTestGridName(1), false); + + f1.get(); + + int s2 = internalCache(2).primaryKeySet().size(); + int s3 = internalCache(3).primaryKeySet().size(); + int total = threads * cnt; + + assertEquals(total, s2 + s3); + + final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null); + + rmvLdr.updater(DataStreamerCacheUpdaters.<Integer, Integer>batchedSorted()); + + final CountDownLatch l2 = new CountDownLatch(threads); + + IgniteInternalFuture<?> f2 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) { + final int key = idxGen.decrementAndGet(); + + futs.add(rmvLdr.removeData(key)); + } + + l2.countDown(); + + for (IgniteFuture<?> fut : futs) + fut.get(); + + return null; + } + }, threads); + + l2.await(); + + rmvLdr.close(false); + + f2.get(); + + s2 = internalCache(2).primaryKeySet().size(); + s3 = internalCache(3).primaryKeySet().size(); + + assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']'; + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedIsolated() throws Exception { + mode = PARTITIONED; + + checkIsolatedDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedIsolated() throws Exception { + mode = REPLICATED; + + checkIsolatedDataStreamer(); + } + + /** + * @throws Exception If failed. + */ + private void checkIsolatedDataStreamer() throws Exception { + try { + useCache = true; + + Ignite g1 = startGrid(0); + startGrid(1); + startGrid(2); + + awaitPartitionMapExchange(); + + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); + + for (int i = 0; i < 100; i++) + cache.put(i, -1); + + final int cnt = 40_000; + final int threads = 10; + + try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null)) { + final AtomicInteger idxGen = new AtomicInteger(); + + IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < cnt; i++) { + int idx = idxGen.getAndIncrement(); + + ldr.addData(idx, idx); + } + + return null; + } + }, threads); + + f1.get(); + } + + for (int g = 0; g < 3; g++) { + ClusterNode locNode = grid(g).localNode(); + + GridCacheAdapter<Integer, Integer> cache0 = ((IgniteKernal)grid(g)).internalCache(null); + + if (cache0.isNear()) + cache0 = ((GridNearCacheAdapter<Integer, Integer>)cache0).dht(); + + CacheAffinity<Integer> aff = cache0.affinity(); + + for (int key = 0; key < cnt * threads; key++) { + if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) { + GridCacheEntryEx entry = cache0.peekEx(key); + + assertNotNull("Missing entry for key: " + key, entry); + assertEquals((key < 100 ? -1 : key), + CU.value(entry.rawGetOrUnmarshal(false), cache0.context(), false)); + } + } + } + } + finally { + stopAllGrids(); + } + } + + /** + * Test primitive arrays can be passed into data streamer. + * + * @throws Exception If failed. + */ + public void testPrimitiveArrays() throws Exception { + try { + useCache = true; + mode = PARTITIONED; + + Ignite g1 = startGrid(1); + startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used). + + List<Object> arrays = Arrays.<Object>asList( + new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4}, + new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8}); + + IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(null); + + for (int i = 0, size = arrays.size(); i < 1000; i++) { + Object arr = arrays.get(i % size); + + dataLdr.addData(i, arr); + dataLdr.addData(i, fixedClosure(arr)); + } + + dataLdr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedMultiThreaded() throws Exception { + mode = REPLICATED; + + checkLoaderMultithreaded(1, 2); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedMultiThreaded() throws Exception { + mode = PARTITIONED; + + checkLoaderMultithreaded(1, 3); + } + + /** + * Tests loader in multithreaded environment with various count of grids started. + * + * @param nodesCntNoCache How many nodes should be started without cache. + * @param nodesCntCache How many nodes should be started with cache. + * @throws Exception If failed. + */ + protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache) + throws Exception { + try { + // Start all required nodes. + int idx = 1; + + for (int i = 0; i < nodesCntNoCache; i++) + startGrid(idx++); + + useCache = true; + + for (int i = 0; i < nodesCntCache; i++) + startGrid(idx++); + + Ignite g1 = grid(1); + + // Get and configure loader. + final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null); + + ldr.updater(DataStreamerCacheUpdaters.<Integer, Integer>individual()); + ldr.perNodeBufferSize(2); + + // Define count of puts. + final AtomicInteger idxGen = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + try { + final int totalPutCnt = 50000; + + IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Collection<IgniteFuture<?>> futs = new ArrayList<>(); + + while (!done.get()) { + int idx = idxGen.getAndIncrement(); + + if (idx >= totalPutCnt) { + info(">>> Stopping producer thread since maximum count of puts reached."); + + break; + } + + futs.add(ldr.addData(idx, idx)); + } + + ldr.flush(); + + for (IgniteFuture<?> fut : futs) + fut.get(); + + return null; + } + }, 5, "producer"); + + IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!done.get()) { + ldr.flush(); + + U.sleep(100); + } + + return null; + } + }, 1, "flusher"); + + // Define index of node being restarted. + final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1; + + IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + for (int i = 0; i < 5; i++) { + Ignite g = startGrid(restartNodeIdx); + + UUID id = g.cluster().localNode().id(); + + info(">>>>>>> Started node: " + id); + + U.sleep(1000); + + stopGrid(getTestGridName(restartNodeIdx), true); + + info(">>>>>>> Stopped node: " + id); + } + } + finally { + done.set(true); + + info("Start stop thread finished."); + } + + return null; + } + }, 1, "start-stop-thread"); + + fut1.get(); + fut2.get(); + fut3.get(); + } + finally { + ldr.close(false); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testLoaderApi() throws Exception { + useCache = true; + + try { + Ignite g1 = startGrid(1); + + IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null); + + ldr.close(false); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + ldr.future().get(); + + try { + // Create another loader. + ldr = g1.dataStreamer("UNKNOWN_CACHE"); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + ldr.close(true); + + assert ldr.future().isDone(); + + ldr.future().get(); + + // Create another loader. + ldr = g1.dataStreamer(null); + + // Cancel with future. + ldr.future().cancel(); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + try { + ldr.future().get(); + + assert false; + } + catch (IgniteFutureCancelledException e) { + info("Caught expected exception: " + e); + } + + // Create another loader. + ldr = g1.dataStreamer(null); + + // This will close loader. + stopGrid(getTestGridName(1), false); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + ldr.future().get(); + } + finally { + stopAllGrids(); + } + } + + /** + * Wraps integer to closure returning it. + * + * @param i Value to wrap. + * @return Callable. + */ + private static Callable<Integer> callable(@Nullable final Integer i) { + return new Callable<Integer>() { + @Override public Integer call() throws Exception { + return i; + } + }; + } + + /** + * Wraps integer to closure returning it. + * + * @param i Value to wrap. + * @return Closure. + */ + private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) { + return new IgniteClosure<Integer, Integer>() { + @Override public Integer apply(Integer e) { + return e == null ? i : e + i; + } + }; + } + + /** + * Wraps object to closure returning it. + * + * @param obj Value to wrap. + * @return Closure. + */ + private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) { + return new IgniteClosure<T, T>() { + @Override public T apply(T e) { + assert e == null || obj == null || e.getClass() == obj.getClass() : + "Expects the same types [e=" + e + ", obj=" + obj + ']'; + + return obj; + } + }; + } + + /** + * Wraps integer to closure expecting it and returning {@code null}. + * + * @param exp Expected closure value. + * @return Remove expected cache value closure. + */ + private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) { + return new IgniteClosure<T, T>() { + @Override public T apply(T act) { + if (exp == null ? act == null : exp.equals(act)) + return null; + + throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']'); + } + }; + } + + /** + * @throws Exception If failed. + */ + public void testFlush() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + final IgniteCache<Integer, Integer> c = g.jcache(null); + + final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); + + ldr.perNodeBufferSize(10); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.localSize() == 0); + + multithreaded(new Callable<Void>() { + @Override + public Void call() throws Exception { + ldr.flush(); + + assertEquals(9, c.size()); + + return null; + } + }, 5, "flush-checker"); + + ldr.addData(100, 100); + + ldr.flush(); + + assertEquals(10, c.size()); + + ldr.addData(200, 200); + + ldr.close(false); + + ldr.future().get(); + + assertEquals(11, c.size()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testTryFlush() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + IgniteCache<Integer, Integer> c = g.jcache(null); + + IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); + + ldr.perNodeBufferSize(10); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.localSize() == 0); + + ldr.tryFlush(); + + Thread.sleep(100); + + assertEquals(9, c.size()); + + ldr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testFlushTimeout() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + final CountDownLatch latch = new CountDownLatch(9); + + g.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + latch.countDown(); + + return true; + } + }, EVT_CACHE_OBJECT_PUT); + + IgniteCache<Integer, Integer> c = g.jcache(null); + + assertTrue(c.localSize() == 0); + + IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); + + ldr.perNodeBufferSize(10); + ldr.autoFlushFrequency(3000); + ldr.allowOverwrite(true); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.localSize() == 0); + + assertFalse(latch.await(1000, MILLISECONDS)); + + assertTrue(c.localSize() == 0); + + assertTrue(latch.await(3000, MILLISECONDS)); + + assertEquals(9, c.size()); + + ldr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testUpdateStore() throws Exception { + storeMap = new ConcurrentHashMap<>(); + + try { + store = new TestStore(); + + useCache = true; + + Ignite ignite = startGrid(1); + + startGrid(2); + startGrid(3); + + for (int i = 0; i < 1000; i++) + storeMap.put(i, i); + + try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) { + ldr.allowOverwrite(true); + + assertFalse(ldr.skipStore()); + + for (int i = 0; i < 1000; i++) + ldr.removeData(i); + + for (int i = 1000; i < 2000; i++) + ldr.addData(i, i); + } + + for (int i = 0; i < 1000; i++) + assertNull(storeMap.get(i)); + + for (int i = 1000; i < 2000; i++) + assertEquals(i, storeMap.get(i)); + + try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) { + ldr.allowOverwrite(true); + + ldr.skipStore(true); + + for (int i = 0; i < 1000; i++) + ldr.addData(i, i); + + for (int i = 1000; i < 2000; i++) + ldr.removeData(i); + } + + IgniteCache<Object, Object> cache = ignite.jcache(null); + + for (int i = 0; i < 1000; i++) { + assertNull(storeMap.get(i)); + + assertEquals(i, cache.get(i)); + } + + for (int i = 1000; i < 2000; i++) { + assertEquals(i, storeMap.get(i)); + + assertNull(cache.localPeek(i, CachePeekMode.ONHEAP)); + } + } + finally { + storeMap = null; + } + } + + /** + * @throws Exception If failed. + */ + public void testCustomUserUpdater() throws Exception { + useCache = true; + + try { + Ignite ignite = startGrid(1); + + startGrid(2); + startGrid(3); + + try (IgniteDataStreamer<String, TestObject> ldr = ignite.dataStreamer(null)) { + ldr.allowOverwrite(true); + + ldr.updater(new IgniteDataStreamer.Updater<String, TestObject>() { + @Override public void update(IgniteCache<String, TestObject> cache, + Collection<Map.Entry<String, TestObject>> entries) { + for (Map.Entry<String, TestObject> e : entries) { + assertTrue(e.getKey() instanceof String); + assertTrue(e.getValue() instanceof TestObject); + + cache.put(e.getKey(), new TestObject(e.getValue().val + 1)); + } + } + }); + + for (int i = 0; i < 100; i++) + ldr.addData(String.valueOf(i), new TestObject(i)); + } + + IgniteCache<String, TestObject> cache = ignite.jcache(null); + + for (int i = 0; i < 100; i++) { + TestObject val = cache.get(String.valueOf(i)); + + assertNotNull(val); + assertEquals(i + 1, val.val); + } + } + finally { + stopAllGrids(); + } + } + + /** + * + */ + private static class TestObject { + /** Value. */ + private final int val; + + /** + * @param val Value. + */ + private TestObject(int val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestObject obj = (TestObject)o; + + return val == obj.val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + } + + /** + * + */ + private static class TestStore extends CacheStoreAdapter<Object, Object> { + /** {@inheritDoc} */ + @Nullable @Override public Object load(Object key) { + return storeMap.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> entry) { + storeMap.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + storeMap.remove(key); + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java index 0000000,3d100e1..bf5707e mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/IgniteDataStreamerPerformanceTest.java @@@ -1,0 -1,197 +1,196 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.datastreamer; + + import org.apache.ignite.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.spi.discovery.tcp.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.testframework.junits.common.*; + import org.jdk8.backport.*; + + import java.util.concurrent.*; + -import static org.apache.ignite.cache.CacheDistributionMode.*; + import static org.apache.ignite.cache.CacheMode.*; + import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + import static org.apache.ignite.events.EventType.*; + + /** + * Data streamer performance test. Compares group lock data streamer to traditional lock. + * <p> + * Disable assertions and give at least 2 GB heap to run this test. + */ + public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int GRID_CNT = 3; + + /** */ + private static final int ENTRY_CNT = 80000; + + /** */ + private boolean useCache; + + /** */ + private String[] vals = new String[2048]; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(spi); + + cfg.setIncludeProperties(); + + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + cfg.setConnectorConfiguration(null); + + cfg.setPeerClassLoadingEnabled(true); + + if (useCache) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + - cc.setDistributionMode(PARTITIONED_ONLY); ++ cc.setNearConfiguration(null); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setStartSize(ENTRY_CNT / GRID_CNT); + cc.setSwapEnabled(false); + + cc.setBackups(1); + + cfg.setCacheSanityCheckEnabled(false); + cfg.setCacheConfiguration(cc); + } + else + cfg.setCacheConfiguration(); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + for (int i = 0; i < vals.length; i++) { + int valLen = ThreadLocalRandom8.current().nextInt(128, 512); + + StringBuilder sb = new StringBuilder(); + + for (int j = 0; j < valLen; j++) + sb.append('a' + ThreadLocalRandom8.current().nextInt(20)); + + vals[i] = sb.toString(); + + info("Value: " + vals[i]); + } + } + + /** + * @throws Exception If failed. + */ + public void testPerformance() throws Exception { + doTest(); + } + + /** + * @throws Exception If failed. + */ + private void doTest() throws Exception { + System.gc(); + System.gc(); + System.gc(); + + try { + useCache = true; + + startGridsMultiThreaded(GRID_CNT); + + useCache = false; + + Ignite ignite = startGrid(); + + final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null); + + ldr.perNodeBufferSize(8192); + ldr.updater(DataStreamerCacheUpdaters.<Integer, String>batchedSorted()); + ldr.autoFlushFrequency(0); + + final LongAdder cnt = new LongAdder(); + + long start = U.currentTimeMillis(); + + Thread t = new Thread(new Runnable() { + @SuppressWarnings("BusyWait") + @Override public void run() { + while (true) { + try { + Thread.sleep(10000); + } + catch (InterruptedException ignored) { + break; + } + + info(">>> Adds/sec: " + cnt.sumThenReset() / 10); + } + } + }); + + t.setDaemon(true); + + t.start(); + + int threadNum = 2;//Runtime.getRuntime().availableProcessors(); + + multithreaded(new Callable<Object>() { + @SuppressWarnings("InfiniteLoopStatement") + @Override public Object call() throws Exception { + ThreadLocalRandom8 rnd = ThreadLocalRandom8.current(); + + while (true) { + int i = rnd.nextInt(ENTRY_CNT); + + ldr.addData(i, vals[rnd.nextInt(vals.length)]); + + cnt.increment(); + } + } + }, threadNum, "loader"); + + info("Closing loader..."); + + ldr.close(false); + + long duration = U.currentTimeMillis() - start; + + info("Finished performance test. Duration: " + duration + "ms."); + } + finally { + stopAllGrids(); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCachePerBlockLruEvictionPolicySelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCachePerBlockLruEvictionPolicySelfTest.java index 677703d,50f3145..5f58800 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCachePerBlockLruEvictionPolicySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCachePerBlockLruEvictionPolicySelfTest.java @@@ -115,9 -118,8 +118,8 @@@ public class IgfsCachePerBlockLruEvicti metaCacheCfg.setName("metaCache"); metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + metaCacheCfg.setNearConfiguration(null); metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setQueryIndexEnabled(false); metaCacheCfg.setAtomicityMode(TRANSACTIONAL); IgniteConfiguration cfg = new IgniteConfiguration(); @@@ -173,9 -174,8 +174,8 @@@ metaCacheCfg.setName("metaCache"); metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + metaCacheCfg.setNearConfiguration(null); metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setQueryIndexEnabled(false); metaCacheCfg.setAtomicityMode(TRANSACTIONAL); IgniteConfiguration cfg = new IgniteConfiguration(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCacheSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java index 334c998,6349cca..6457121 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java @@@ -131,9 -134,8 +134,8 @@@ public class IgfsMetricsSelfTest extend metaCacheCfg.setName("metaCache"); metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + metaCacheCfg.setNearConfiguration(null); metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setQueryIndexEnabled(false); metaCacheCfg.setAtomicityMode(TRANSACTIONAL); IgniteConfiguration cfg = new IgniteConfiguration(); @@@ -183,9 -184,8 +184,8 @@@ metaCacheCfg.setName("metaCache"); metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + metaCacheCfg.setNearConfiguration(null); metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setQueryIndexEnabled(false); metaCacheCfg.setAtomicityMode(TRANSACTIONAL); IgniteConfiguration cfg = new IgniteConfiguration(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModesSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsAbstractRecordResolverSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java index 926da8d,a4afde9..476298d --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java @@@ -23,8 -24,7 +24,6 @@@ import org.apache.ignite.internal.util. import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; - import java.util.*; - -import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; import static org.apache.ignite.events.EventType.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java index 19c4551,10134b7..91994f5 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/GridCacheMultiNodeLoadTest.java @@@ -25,8 -25,9 +25,8 @@@ import org.apache.ignite.spi.discovery. import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.junits.common.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; - import static org.apache.ignite.cache.CachePreloadMode.*; + import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/loadtests/datastructures/GridCachePartitionedAtomicLongLoadTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 55a1b58,f0cec80..68846c3 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@@ -1114,10 -1115,9 +1114,9 @@@ public abstract class GridAbstractTest CacheConfiguration cfg = new CacheConfiguration(); cfg.setStartSize(1024); - cfg.setQueryIndexEnabled(true); cfg.setAtomicWriteOrderMode(PRIMARY); cfg.setAtomicityMode(TRANSACTIONAL); - cfg.setDistributionMode(NEAR_PARTITIONED); + cfg.setNearConfiguration(new NearCacheConfiguration()); cfg.setWriteSynchronizationMode(FULL_SYNC); cfg.setEvictionPolicy(null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryModeSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index ffe0b1a,c3a3da3..9965b05 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@@ -22,8 -23,8 +23,9 @@@ import org.apache.ignite.cache.query.* import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.processors.query.h2.opt.*; @@@ -293,19 -246,7 +247,20 @@@ public class IgniteH2Indexing implement } /** + * Creates DB schema if it has not been created yet. + * + * @param schema Schema name. + * @throws IgniteCheckedException If failed to create db schema. + */ + private void dropSchemaIfExists(String schema) throws IgniteCheckedException { - executeStatement("DROP SCHEMA IF EXISTS \"" + schema + '"'); ++ executeStatement("INFORMATION_SCHEMA", "DROP SCHEMA IF EXISTS \"" + schema + '"'); + + if (log.isDebugEnabled()) + log.debug("Dropped H2 schema for index database: " + schema); + } + + /** + * @param schema Schema * @param sql SQL statement. * @throws IgniteCheckedException If failed. */ @@@ -1300,53 -1184,18 +1198,33 @@@ }; } + /** {@inheritDoc} */ + @Override public void onCacheStarted(GridCacheContext ctx) throws IgniteCheckedException { + if (registerSpace(ctx.name())) + createSchemaIfAbsent(schema(ctx.name())); + } + + /** {@inheritDoc} */ + @Override public void onCacheStopped(GridCacheContext ctx) throws IgniteCheckedException { + if (unregisterSpace(ctx.name())) { + dropSchemaIfExists(schema(ctx.name())); + + schemas.remove(schema(ctx.name())); + } + } + /** - * Runs initial script. - * - * @throws IgniteCheckedException If failed. - * @throws SQLException If failed. - */ - private void runInitScript() throws IgniteCheckedException, SQLException { - String initScriptPath = cfg.getInitialScriptPath(); - - if (initScriptPath == null) - return; - - try (PreparedStatement p = connectionForThread(null).prepareStatement("RUNSCRIPT FROM ? CHARSET 'UTF-8'")) { - p.setString(1, initScriptPath); - - p.execute(); - } - } - - /** * Registers SQL functions. * - * @throws SQLException If failed. + * @param schema Schema. + * @param clss Classes. * @throws IgniteCheckedException If failed. */ - private void createSqlFunctions() throws SQLException, IgniteCheckedException { - Class<?>[] idxCustomFuncClss = cfg.getIndexCustomFunctionClasses(); - - if (F.isEmpty(idxCustomFuncClss)) + private void createSqlFunctions(String schema, Class<?>[] clss) throws IgniteCheckedException { + if (F.isEmpty(clss)) return; - for (Class<?> cls : idxCustomFuncClss) { + for (Class<?> cls : clss) { for (Method m : cls.getDeclaredMethods()) { QuerySqlFunction ann = m.getAnnotation(QuerySqlFunction.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFieldsQuerySelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java index 144655f,6a672c0..c0c54d0 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java @@@ -91,10 -93,24 +93,22 @@@ public class GridCacheCrossCacheQuerySe cc.setName(name); cc.setCacheMode(mode); cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - cc.setPreloadMode(SYNC); + cc.setRebalanceMode(SYNC); cc.setSwapEnabled(true); - cc.setEvictNearSynchronized(false); cc.setAtomicityMode(TRANSACTIONAL); - cc.setDistributionMode(NEAR_PARTITIONED); + if (mode == CacheMode.PARTITIONED) + cc.setIndexedTypes( + Integer.class, FactPurchase.class + ); + else if (mode == CacheMode.REPLICATED) + cc.setIndexedTypes( + Integer.class, DimProduct.class, + Integer.class, DimStore.class + ); + else + throw new IllegalStateException("mode: " + mode); + return cc; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapAndSwapSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapAndSwapSelfTest.java index a171f98,ea34e8a..964e3a2 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapAndSwapSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapAndSwapSelfTest.java @@@ -127,9 -128,13 +127,12 @@@ public class GridCacheOffHeapAndSwapSel cacheCfg.setBackups(1); cacheCfg.setOffHeapMaxMemory(OFFHEAP_MEM); cacheCfg.setEvictSynchronized(true); - cacheCfg.setEvictNearSynchronized(true); cacheCfg.setEvictSynchronizedKeyBufferSize(1); cacheCfg.setAtomicityMode(TRANSACTIONAL); - cacheCfg.setDistributionMode(NEAR_PARTITIONED); + cacheCfg.setIndexedTypes( + Long.class, Long.class + ); + cacheCfg.setNearConfiguration(new NearCacheConfiguration()); cacheCfg.setEvictionPolicy(null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java index 84d8290,6310c39..7568f26 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java @@@ -62,9 -62,10 +62,9 @@@ public class GridIndexingWithNoopSwapSe cc.setCacheMode(PARTITIONED); cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - cc.setPreloadMode(SYNC); + cc.setRebalanceMode(SYNC); cc.setSwapEnabled(true); - cc.setDistributionMode(CacheDistributionMode.NEAR_PARTITIONED); - cc.setEvictNearSynchronized(false); + cc.setNearConfiguration(new NearCacheConfiguration()); cc.setEvictionPolicy(new CacheFifoEvictionPolicy(1000)); cc.setBackups(1); cc.setAtomicityMode(TRANSACTIONAL);