http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a800b1a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java index 8763dd1,0000000..f4cb8a3 mode 100644,000000..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java @@@ -1,748 -1,0 +1,748 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.lru.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.swapspace.file.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.query.*; +import org.apache.ignite.internal.processors.query.h2.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Multi-threaded tests for cache queries. + */ +@SuppressWarnings("StatementWithEmptyBody") +public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTest { + /** */ + private static final boolean TEST_INFO = true; + + /** Number of test grids (nodes). Should not be less than 2. */ + private static final int GRID_CNT = 2; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static AtomicInteger idxSwapCnt = new AtomicInteger(); + + /** */ + private static AtomicInteger idxUnswapCnt = new AtomicInteger(); + + /** */ + private static final long DURATION = 30 * 1000; + + /** Don't start grid by default. */ + public IgniteCacheQueryMultiThreadedSelfTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setSwapSpaceSpi(new FileSwapSpaceSpi()); + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setDistributionMode(CacheDistributionMode.NEAR_PARTITIONED); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setSwapEnabled(true); + cacheCfg.setBackups(1); + cacheCfg.setEvictionPolicy(evictsEnabled() ? new CacheLruEvictionPolicy(100) : null); + + CacheQueryConfiguration qcfg = new CacheQueryConfiguration(); + + qcfg.setIndexPrimitiveKey(true); + + cacheCfg.setQueryConfiguration(qcfg); + + if (offheapEnabled() && evictsEnabled()) + cacheCfg.setOffHeapMaxMemory(1000); // Small offheap for evictions. + + cfg.setCacheConfiguration(cacheCfg); + + GridQueryConfiguration indexing = new GridQueryConfiguration(); + + indexing.setMaxOffheapRowsCacheSize(128); + + if (offheapEnabled()) + indexing.setMaxOffHeapMemory(0); + + cfg.setQueryConfiguration(indexing); + + GridQueryProcessor.idxCls = FakeIndexing.class; + + return cfg; + } + + /** + * + */ + private static class FakeIndexing extends IgniteH2Indexing { + @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteCheckedException { + super.onSwap(spaceName, key); + + idxSwapCnt.incrementAndGet(); + } + + @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes) + throws IgniteCheckedException { + super.onUnswap(spaceName, key, val, valBytes); + + idxUnswapCnt.incrementAndGet(); + } + } + + /** @return {@code true} If offheap enabled. */ + protected boolean offheapEnabled() { + return false; + } + + /** @return {@code true} If evictions enabled. */ + protected boolean evictsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + // Clean up all caches. + for (int i = 0; i < GRID_CNT; i++) { + GridCache<Object, Object> c = grid(i).cache(null); + + assertEquals(0, c.size()); + } + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + assert GRID_CNT >= 2 : "Constant GRID_CNT must be greater than or equal to 2."; + + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + if (evictsEnabled()) { + assertTrue(idxSwapCnt.get() > 0); + assertTrue(idxUnswapCnt.get() > 0); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + // Clean up all caches. + for (int i = 0; i < GRID_CNT; i++) { + GridCache<Object, Object> c = grid(i).cache(null); + + c.removeAll(F.<CacheEntry<Object, Object>>alwaysTrue()); + + Iterator<Map.Entry<Object, Object>> it = c.swapIterator(); + + while (it.hasNext()) { + it.next(); + + it.remove(); + } + + it = c.offHeapIterator(); + + while (it.hasNext()) { + it.next(); + + it.remove(); + } + + assertEquals("Swap keys: " + c.swapKeys(), 0, c.swapKeys()); + assertEquals(0, c.offHeapEntriesCount()); + assertEquals(0, c.size()); + } + } + + /** {@inheritDoc} */ + @Override protected void info(String msg) { + if (TEST_INFO) + super.info(msg); + } + + /** + * @param entries Entries. + * @param g Grid. + * @return Affinity nodes. + */ + private Set<UUID> affinityNodes(Iterable<Cache.Entry<Integer, Integer>> entries, Ignite g) { + Set<UUID> nodes = new HashSet<>(); + + for (Cache.Entry<Integer, Integer> entry : entries) + nodes.add(g.cache(null).affinity().mapKeyToPrimaryAndBackups(entry.getKey()).iterator().next().id()); + + return nodes; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSwapUnswapString() throws Exception { + int threadCnt = 150; + final int keyCnt = 2000; + final int valCnt = 10000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, String> c = g.jcache(null); + final IgniteCache<Integer, Long> cl = g.jcache(null); + + assertEquals(0, g.cache(null).size()); + assertEquals(0, c.query(new QuerySqlPredicate<Integer, String>("1 = 1")).getAll().size()); + assertEquals(0, cl.query(new QuerySqlPredicate<Integer, Long>("1 = 1")).getAll().size()); + + Random rnd = new Random(); + + for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) { + c.put(i, String.valueOf(rnd.nextInt(valCnt))); + + if (evictsEnabled() && rnd.nextBoolean()) + c.localEvict(Arrays.asList(i)); + } + + final AtomicBoolean done = new AtomicBoolean(); + - IgniteFuture<?> fut = multithreadedAsync(new CAX() { ++ IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + Random rnd = new Random(); + + while (!done.get()) { + switch (rnd.nextInt(5)) { + case 0: + c.put(rnd.nextInt(keyCnt), String.valueOf(rnd.nextInt(valCnt))); + + break; + case 1: + if (evictsEnabled()) + c.localEvict(Arrays.asList(rnd.nextInt(keyCnt))); + + break; + case 2: + c.remove(rnd.nextInt(keyCnt)); + + break; + case 3: + c.get(rnd.nextInt(keyCnt)); + + break; + case 4: + int from = rnd.nextInt(valCnt); + + QueryCursor<Cache.Entry<Integer, String>> qry = c.query( - new QuerySqlPredicate<Integer, String>("_val between ? and ?", String.valueOf(from), - String.valueOf(from + 250))); ++ new QuerySqlPredicate<Integer, String>("_val between ? and ?", String.valueOf(from), ++ String.valueOf(from + 250))); + + Collection<Cache.Entry<Integer, String>> res = qry.getAll(); + + for (Cache.Entry<Integer, String> ignored : res) { + //No-op. + } + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSwapUnswapLong() throws Exception { + int threadCnt = 150; + final int keyCnt = 2000; + final int valCnt = 10000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, Long> c = g.jcache(null); + final IgniteCache<Integer, String> c1 = g.jcache(null); + + assertEquals(0, g.cache(null).size()); + assertEquals(0, c1.query(new QuerySqlPredicate<Integer, String>("1 = 1")).getAll().size()); + assertEquals(0, c.query(new QuerySqlPredicate<Integer, Long>("1 = 1")).getAll().size()); + + Random rnd = new Random(); + + for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) { - c.put(i, (long)rnd.nextInt(valCnt)); ++ c.put(i, (long) rnd.nextInt(valCnt)); + + if (evictsEnabled() && rnd.nextBoolean()) + c.localEvict(Arrays.asList(i)); + } + + final AtomicBoolean done = new AtomicBoolean(); + - IgniteFuture<?> fut = multithreadedAsync(new CAX() { ++ IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + Random rnd = new Random(); + + while (!done.get()) { + int key = rnd.nextInt(keyCnt); + + switch (rnd.nextInt(5)) { + case 0: - c.put(key, (long)rnd.nextInt(valCnt)); ++ c.put(key, (long) rnd.nextInt(valCnt)); + + break; + case 1: + if (evictsEnabled()) + c.localEvict(Arrays.asList(key)); + + break; + case 2: + c.remove(key); + + break; + case 3: + c.get(key); + + break; + case 4: + int from = rnd.nextInt(valCnt); + + Collection<Cache.Entry<Integer, Long>> res = c.query(new QuerySqlPredicate<Integer, Long>( + "_val between ? and ?", from, from + 250)).getAll(); + + for (Cache.Entry<Integer, Long> ignored : res) { + //No-op. + } + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSwapUnswapLongString() throws Exception { + int threadCnt = 150; + final int keyCnt = 2000; + final int valCnt = 10000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, Object> c = g.jcache(null); + + assertEquals(0, g.jcache(null).size()); + assertEquals(0, c.query(new QuerySqlPredicate<Integer, Object>("1 = 1")).getAll().size()); + + Random rnd = new Random(); + + for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) { + c.put(i, rnd.nextBoolean() ? (long) rnd.nextInt(valCnt) : String.valueOf(rnd.nextInt(valCnt))); + + if (evictsEnabled() && rnd.nextBoolean()) + c.localEvict(Arrays.asList(i)); + } + + final AtomicBoolean done = new AtomicBoolean(); + - IgniteFuture<?> fut = multithreadedAsync(new CAX() { ++ IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + Random rnd = new Random(); + + while (!done.get()) { + int key = rnd.nextInt(keyCnt); + + switch (rnd.nextInt(5)) { + case 0: + c.put(key, rnd.nextBoolean() ? (long) rnd.nextInt(valCnt) : - String.valueOf(rnd.nextInt(valCnt))); ++ String.valueOf(rnd.nextInt(valCnt))); + + break; + case 1: + if (evictsEnabled()) + c.localEvict(Arrays.asList(key)); + + break; + case 2: + c.remove(key); + + break; + case 3: + c.get(key); + + break; + case 4: + int from = rnd.nextInt(valCnt); + + Collection<Cache.Entry<Integer, Object>> res = c.query( + new QuerySqlPredicate<Integer, Object>("_val between ? and ?", from, from + 250)) + .getAll(); + + for (Cache.Entry<Integer, Object> ignored : res) { + //No-op. + } + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSwapUnswapObject() throws Exception { + int threadCnt = 50; + final int keyCnt = 4000; + final int valCnt = 10000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, TestValue> c = g.jcache(null); + + assertEquals(0, g.cache(null).size()); + assertEquals(0, c.query(new QuerySqlPredicate<Integer, TestValue>("1 = 1")).getAll().size()); + + Random rnd = new Random(); + + for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) { + c.put(i, new TestValue(rnd.nextInt(valCnt))); + + if (evictsEnabled() && rnd.nextBoolean()) + c.localEvict(Arrays.asList(i)); + } + + final AtomicBoolean done = new AtomicBoolean(); + - IgniteFuture<?> fut = multithreadedAsync(new CAX() { ++ IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + Random rnd = new Random(); + + while (!done.get()) { + int key = rnd.nextInt(keyCnt); + + switch (rnd.nextInt(5)) { + case 0: + c.put(key, new TestValue(rnd.nextInt(valCnt))); + + break; + case 1: + if (evictsEnabled()) + c.localEvict(Arrays.asList(key)); + + break; + case 2: + c.remove(key); + + break; + case 3: + c.get(key); + + break; + case 4: + int from = rnd.nextInt(valCnt); + + Collection<Cache.Entry<Integer, TestValue>> res = + c.query(new QuerySqlPredicate<Integer, TestValue>("TestValue.val between ? and ?", + from, from + 250)).getAll(); + + for (Cache.Entry<Integer, TestValue> ignored : res) { + //No-op. + } + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSameQuery() throws Exception { + int threadCnt = 50; + final int keyCnt = 10; + final int logMod = 5000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, Integer> c = g.jcache(null); + + for (int i = 0; i < keyCnt; i++) { + c.put(i, i); + + c.localEvict(Arrays.asList(i)); + } + + final AtomicInteger cnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + - IgniteFuture<?> fut = multithreadedAsync( ++ IgniteInternalFuture<?> fut = multithreadedAsync( + new CAX() { + @Override public void applyx() throws IgniteCheckedException { + int iter = 0; + + while (!done.get() && !Thread.currentThread().isInterrupted()) { + iter++; + + Collection<Cache.Entry<Integer, Integer>> entries = + c.query(new QuerySqlPredicate<Integer, Integer>("_val >= 0")).getAll(); + + assert entries != null; + + assertEquals("Query results [entries=" + entries + ", aff=" + affinityNodes(entries, g) + + ", iteration=" + iter + ']', keyCnt, entries.size()); + + if (cnt.incrementAndGet() % logMod == 0) { + GridCacheQueryManager<Object, Object> qryMgr = - ((GridKernal)g).internalCache().context().queries(); ++ ((IgniteKernal)g).internalCache().context().queries(); + + assert qryMgr != null; + + qryMgr.printMemoryStats(); + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + info("Finishing test..."); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedNewQueries() throws Exception { + int threadCnt = 50; + final int keyCnt = 10; + final int logMod = 5000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, Integer> c = g.jcache(null); + + for (int i = 0; i < keyCnt; i++) { + c.put(i, i); + + c.localEvict(Arrays.asList(i)); + } + + final AtomicInteger cnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + - IgniteFuture<?> fut = multithreadedAsync(new CAX() { ++ IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + int iter = 0; + + while (!done.get() && !Thread.currentThread().isInterrupted()) { + iter++; + + Collection<Cache.Entry<Integer, Integer>> entries = + c.query(new QuerySqlPredicate<Integer, Integer>("_val >= 0")).getAll(); + + assert entries != null; + + assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size()); + + if (cnt.incrementAndGet() % logMod == 0) { + GridCacheQueryManager<Object, Object> qryMgr = - ((GridKernal)g).internalCache().context().queries(); ++ ((IgniteKernal)g).internalCache().context().queries(); + + assert qryMgr != null; + + qryMgr.printMemoryStats(); + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedScanQuery() throws Exception { + int threadCnt = 50; + final int keyCnt = 500; + final int logMod = 5000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, Integer> c = g.jcache(null); + + for (int i = 0; i < keyCnt; i++) + c.put(i, i); + + final AtomicInteger cnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + - IgniteFuture<?> fut = multithreadedAsync( ++ IgniteInternalFuture<?> fut = multithreadedAsync( + new CAX() { + @Override public void applyx() throws IgniteCheckedException { + int iter = 0; + + while (!done.get() && !Thread.currentThread().isInterrupted()) { + iter++; + + // Scan query. + Collection<Cache.Entry<Integer, Integer>> entries = + c.query(new QueryPredicate<Integer, Integer>() { + @Override public boolean apply(Cache.Entry<Integer, Integer> integerIntegerEntry) { + return true; + } + }).getAll(); + + assert entries != null; + + assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size()); + + if (cnt.incrementAndGet() % logMod == 0) { + GridCacheQueryManager<Object, Object> qryMgr = - ((GridKernal)g).internalCache().context().queries(); ++ ((IgniteKernal)g).internalCache().context().queries(); + + assert qryMgr != null; + + qryMgr.printMemoryStats(); + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * Test value. + */ + private static class TestValue implements Serializable { + /** Value. */ + @CacheQuerySqlField + private int val; + + /** + * @param val Value. + */ + private TestValue(int val) { + this.val = val; + } + + /** + * @return Value. + */ + public int value() { + return val; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a800b1a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java index a8da960,0000000..74c31e1 mode 100644,000000..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java @@@ -1,223 -1,0 +1,224 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; ++import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import javax.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Test for distributed queries with node restarts. + */ +public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 3; + + /** */ + private static final int KEY_CNT = 1000; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 90 * 1000; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setBackups(1); + cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(NEAR_PARTITIONED); + + CacheQueryConfiguration qcfg = new CacheQueryConfiguration(); + + qcfg.setIndexPrimitiveKey(true); + + cc.setQueryConfiguration(qcfg); + + c.setCacheConfiguration(cc); + + return c; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testRestarts() throws Exception { + int duration = 60 * 1000; + int qryThreadNum = 10; + final long nodeLifeTime = 2 * 1000; + final int logFreq = 20; + + final IgniteCache<Integer, Integer> cache = grid(0).jcache(null); + + assert cache != null; + + for (int i = 0; i < KEY_CNT; i++) + cache.put(i, i); + + assertEquals(KEY_CNT, cache.size()); + + final AtomicInteger qryCnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + - IgniteFuture<?> fut1 = multithreadedAsync(new CAX() { ++ IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + while (!done.get()) { + Collection<Cache.Entry<Integer, Integer>> res = + cache.query(new QuerySqlPredicate<Integer, Integer>("_val >= 0")).getAll(); + + assertFalse(res.isEmpty()); + + int c = qryCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Executed queries: " + c); + } + } + }, qryThreadNum); + + final AtomicInteger restartCnt = new AtomicInteger(); + + CollectingEventListener lsnr = new CollectingEventListener(); + + for (int i = 0; i < GRID_CNT; i++) + grid(i).events().localListen(lsnr, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED); + - IgniteFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { ++ IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { + @SuppressWarnings({"BusyWait"}) + @Override public Object call() throws Exception { + while (!done.get()) { + int idx = GRID_CNT; + + startGrid(idx); + + Thread.sleep(nodeLifeTime); + + stopGrid(idx); + + int c = restartCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Node restarts: " + c); + } + + return true; + } + }, 1); + + Thread.sleep(duration); + + done.set(true); + + fut1.get(); + fut2.get(); + + info("Awaiting preload events [restartCnt=" + restartCnt.get() + ']'); + + boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 15000); + + for (int i = 0; i < GRID_CNT; i++) + grid(i).events().stopLocalListen(lsnr, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED); + + assert success; + } + + /** Listener that will wait for specified number of events received. */ + private class CollectingEventListener implements IgnitePredicate<IgniteEvent> { + /** Registered events count. */ + private int evtCnt; + + /** {@inheritDoc} */ + @Override public synchronized boolean apply(IgniteEvent evt) { + evtCnt++; + + info("Processed event [evt=" + evt + ", evtCnt=" + evtCnt + ']'); + + notifyAll(); + + return true; + } + + /** + * Waits until total number of events processed is equal or greater then argument passed. + * + * @param cnt Number of events to wait. + * @param timeout Timeout to wait. + * @return {@code True} if successfully waited, {@code false} if timeout happened. + * @throws InterruptedException If thread is interrupted. + */ + public synchronized boolean awaitEvents(int cnt, long timeout) throws InterruptedException { + long start = U.currentTimeMillis(); + + long now = start; + + while (start + timeout > now) { + if (evtCnt >= cnt) + return true; + + wait(start + timeout - now); + + now = U.currentTimeMillis(); + } + + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a800b1a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQuerySelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQuerySelfTest.java index 33012a2,0000000..e122e0c mode 100644,000000..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQuerySelfTest.java @@@ -1,89 -1,0 +1,80 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.replicated; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; - import org.apache.ignite.cluster.*; - import org.apache.ignite.events.*; - import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; - import org.apache.ignite.lang.*; - import org.apache.ignite.internal.processors.query.*; - import org.apache.ignite.internal.util.future.*; - import org.apache.ignite.internal.util.typedef.*; - import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; + +import java.util.*; +import java.util.concurrent.*; + - import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Tests for fields queries. + */ +public class IgniteCacheReplicatedFieldsQuerySelfTest extends IgniteCacheAbstractFieldsQuerySelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** + * @throws Exception If failed. + */ + public void testLostIterator() throws Exception { + IgniteCache<Object, Object> cache = grid(0).jcache(null); + + QueryCursor<List<?>> qry = null; + + int maximumQueryIteratorCount = + cache.getConfiguration(CacheConfiguration.class).getMaximumQueryIteratorCount(); + + for (int i = 0; i < maximumQueryIteratorCount + 1; i++) { + QueryCursor<List<?>> q = cache + .queryFields(new QuerySqlPredicate<>("select _key from Integer where _key >= 0 order by _key", 50)); + + assertEquals(0, q.iterator().next().get(0)); + + if (qry == null) + qry = q; + } + + final QueryCursor<List<?>> qry0 = qry; + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override + public Object call() throws Exception { + int i = 0; + + for (List<?> row : qry0) + assertEquals(++i % 50, row.get(0)); + + return null; + } + }, IgniteException.class, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a800b1a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java index d5a8ca2,0000000..4929ab6 mode 100644,000000..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java @@@ -1,575 -1,0 +1,575 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.replicated; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; +import org.springframework.util.*; + +import java.io.*; +import java.lang.reflect.*; +import java.sql.*; +import java.util.*; +import java.util.concurrent.*; + - import javax.cache.Cache; ++import javax.cache.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Tests replicated query. + */ +public class IgniteCacheReplicatedQuerySelfTest extends IgniteCacheAbstractQuerySelfTest { + /** */ + private static final boolean TEST_DEBUG = false; + + /** Grid1. */ + private static Ignite ignite1; + + /** Grid2. */ + private static Ignite ignite2; + + /** Grid3. */ + private static Ignite ignite3; + + /** Cache1. */ + private static IgniteCache<CacheKey, CacheValue> cache1; + + /** Cache2. */ + private static IgniteCache<CacheKey, CacheValue> cache2; + + /** Cache3. */ + private static IgniteCache<CacheKey, CacheValue> cache3; + + /** Key serialization cnt. */ + private static volatile int keySerCnt; + + /** Key deserialization count. */ + private static volatile int keyDesCnt; + + /** Value serialization count. */ + private static volatile int valSerCnt; + + /** Value deserialization count. */ + private static volatile int valDesCnt; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + ignite1 = grid(0); + ignite2 = grid(1); + ignite3 = grid(2); + + cache1 = ignite1.jcache(null); + cache2 = ignite2.jcache(null); + cache3 = ignite3.jcache(null); + } + + /** + * @throws Exception If failed. + */ + public void testClientOnlyNode() throws Exception { + try { + Ignite g = startGrid("client"); + + IgniteCache<Integer, Integer> c = g.jcache(null); + + for (int i = 0; i < 10; i++) + c.put(i, i); + + // Client cache should be empty. + assertEquals(0, c.size()); + + Collection<Cache.Entry<Integer, Integer>> res = + c.query(new QuerySqlPredicate<Integer, Integer>("_key >= 5 order by _key")).getAll(); + + assertEquals(5, res.size()); + + int i = 5; + + for (Cache.Entry<Integer, Integer> e : res) { + assertEquals(i, e.getKey().intValue()); + assertEquals(i, e.getValue().intValue()); + + i++; + } + } + finally { + stopGrid("client"); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testIterator() throws Exception { + int keyCnt = 100; + + for (int i = 0; i < keyCnt; i++) + cache1.put(new CacheKey(i), new CacheValue("val" + i)); + + assertEquals(keyCnt, cache1.size()); + assertEquals(keyCnt, cache2.size()); + assertEquals(keyCnt, cache3.size()); + + QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry = + cache1.query(new QuerySqlPredicate<CacheKey, CacheValue>("select * from CacheValue", 10, new Object[0])); + + Iterator<Cache.Entry<CacheKey, CacheValue>> iter = qry.iterator(); + + assert iter.hasNext(); + + int cnt = 0; + + while (iter.hasNext()) { + iter.next(); + + cnt++; + } + + // Expect duplicates since we run query on full projection of 3 nodes and dedup flag is false. + assertEquals(keyCnt * 3, cnt); + } + + /** + * @throws Exception If test failed. + */ + public void testLocalQuery() throws Exception { + cache1.removeAll(); + + IgniteTx tx = ignite1.transactions().txStart(); + + try { + cache1.put(new CacheKey(1), new CacheValue("1")); + cache1.put(new CacheKey(2), new CacheValue("2")); + cache1.put(new CacheKey(3), new CacheValue("3")); + cache1.put(new CacheKey(4), new CacheValue("4")); + + tx.commit(); + + info("Committed transaction: " + tx); + } + catch (IgniteCheckedException e) { + tx.rollback(); + + throw e; + } + + checkQueryResults(cache1); + checkQueryResults(cache2); + checkQueryResults(cache3); + } + + /** + * @throws Exception If test failed. + */ + public void testDistributedQuery() throws Exception { + int keyCnt = 4; + + final CountDownLatch latch = new CountDownLatch(keyCnt * 2); + + IgnitePredicate<IgniteEvent> lsnr = new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + latch.countDown(); + + return true; + } + }; + + ignite2.events().localListen(lsnr, IgniteEventType.EVT_CACHE_OBJECT_PUT); + ignite3.events().localListen(lsnr, IgniteEventType.EVT_CACHE_OBJECT_PUT); + + IgniteTx tx = ignite1.transactions().txStart(); + + try { + for (int i = 1; i <= keyCnt; i++) + cache1.put(new CacheKey(i), new CacheValue(String.valueOf(i))); + + tx.commit(); + + info("Committed transaction: " + tx); + } + catch (IgniteCheckedException e) { + tx.rollback(); + + throw e; + } + + latch.await(); + + QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry = + cache1.query(new QuerySqlPredicate<CacheKey, CacheValue>("val > 1 and val < 4")); + + // Distributed query. + assertEquals(6, qry.getAll().size()); + + // Create new query, old query cannot be modified after it has been executed. + qry = cache3.localQuery(new QuerySqlPredicate<CacheKey, CacheValue>("val > 1 and val < 4")); + + // Tests execute on node. + Iterator<Cache.Entry<CacheKey, CacheValue>> iter = qry.iterator(); + + assert iter != null; + assert iter.hasNext(); + + iter.next(); + + assert iter.hasNext(); + + iter.next(); + + assert !iter.hasNext(); + } + + /** + * Returns private field {@code qryIters} of {@link GridCacheQueryManager} for the given grid. + * + * @param g Grid which {@link GridCacheQueryManager} should be observed. + * @return {@code qryIters} of {@link GridCacheQueryManager}. + */ + private ConcurrentMap<UUID, + Map<Long, GridFutureAdapter<GridCloseableIterator<IgniteBiTuple<CacheKey, CacheValue>>>>> + distributedQueryManagerQueryItersMap(Ignite g) { - GridCacheContext ctx = ((GridKernal)g).internalCache().context(); ++ GridCacheContext ctx = ((IgniteKernal)g).internalCache().context(); + + Field qryItersField = ReflectionUtils.findField(ctx.queries().getClass(), "qryIters"); + + qryItersField.setAccessible(true); + + return (ConcurrentMap<UUID, + Map<Long, GridFutureAdapter<GridCloseableIterator<IgniteBiTuple<CacheKey, CacheValue>>>>>) + ReflectionUtils.getField(qryItersField, ctx.queries()); + } + + /** + * @throws Exception If test failed. + */ + public void testToString() throws Exception { + int keyCnt = 4; + + for (int i = 1; i <= keyCnt; i++) + cache1.put(new CacheKey(i), new CacheValue(String.valueOf(i))); + + // Create query with key filter. + + QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry = + cache1.query(new QuerySqlPredicate<CacheKey, CacheValue>("val > 0")); + + assertEquals(keyCnt * 3, qry.getAll().size()); + + info("Query result: " + qry.getAll()); + } + + /** + * @throws Exception If failed. + */ + public void testLostIterator() throws Exception { + IgniteCache<Integer, Integer> cache = ignite.jcache(null); + + for (int i = 0; i < 1000; i++) + cache.put(i, i); + + QueryCursor<Cache.Entry<Integer, Integer>> fut = null; + + for (int i = 0; i < cache.getConfiguration(CacheConfiguration.class).getMaximumQueryIteratorCount() + 1; i++) { + QueryCursor<Cache.Entry<Integer, Integer>> q = + cache.query(new QuerySqlPredicate<Integer, Integer>("_key >= 0 order by _key")); + + assertEquals(0, (int)q.iterator().next().getKey()); + + if (fut == null) + fut = q; + } + + final QueryCursor<Cache.Entry<Integer, Integer>> fut0 = fut; + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + int i = 0; + + Cache.Entry<Integer, Integer> e; + + while ((e = fut0.iterator().next()) != null) + assertEquals(++i, (int)e.getKey()); + + return null; + } + }, IgniteException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testNodeLeft() throws Exception { + try { + Ignite g = startGrid(); + + IgniteCache<Integer, Integer> cache = g.jcache(null); + + for (int i = 0; i < 1000; i++) + cache.put(i, i); + + QueryCursor<Cache.Entry<Integer, Integer>> q = + cache.query(new QuerySqlPredicate<Integer, Integer>("_key >= 0 order by _key", 50, new Object[0])); + + assertEquals(0, (int) q.iterator().next().getKey()); + + final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<GridCloseableIterator< + IgniteBiTuple<Integer, Integer>>>>> map = - U.field(((GridKernal)grid(0)).internalCache().context().queries(), "qryIters"); ++ U.field(((IgniteKernal)grid(0)).internalCache().context().queries(), "qryIters"); + + // fut.nextX() does not guarantee the request has completed on remote node + // (we could receive page from local one), so we need to wait. + assertTrue(GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return map.size() == 1; + } + }, getTestTimeout())); + + Map<Long, GridFutureAdapter<GridCloseableIterator<IgniteBiTuple<Integer, Integer>>>> futs = + map.get(g.cluster().localNode().id()); + + assertEquals(1, futs.size()); + + GridCloseableIterator<IgniteBiTuple<Integer, Integer>> iter = - (GridCloseableIterator<IgniteBiTuple<Integer, Integer>>)((IgniteFuture)F.first(futs.values()).get()).get(); ++ (GridCloseableIterator<IgniteBiTuple<Integer, Integer>>)((IgniteInternalFuture)F.first(futs.values()).get()).get(); + + ResultSet rs = U.field(iter, "data"); + + assertFalse(rs.isClosed()); + + final UUID nodeId = g.cluster().localNode().id(); + final CountDownLatch latch = new CountDownLatch(1); + + grid(0).events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + if (((IgniteDiscoveryEvent)evt).eventNode().id().equals(nodeId)) + latch.countDown(); + + return true; + } + }, EVT_NODE_LEFT); + + stopGrid(); + + latch.await(); + + assertEquals(0, map.size()); + assertTrue(rs.isClosed()); + } + finally { + // Ensure that additional node is stopped. + stopGrid(); + } + } + + /** + * @param cache Cache. + * @throws Exception If check failed. + */ + private void checkQueryResults(IgniteCache<CacheKey, CacheValue> cache) throws Exception { + QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry = + cache.localQuery(new QuerySqlPredicate<CacheKey, CacheValue>("val > 1 and val < 4")); + + Iterator<Cache.Entry<CacheKey, CacheValue>> iter = qry.iterator(); + + assert iter != null; + assert iter.hasNext(); + + Cache.Entry<CacheKey, CacheValue> entry = iter.next(); + + assert entry.getKey().equals(new CacheKey(2)) || entry.getKey().equals(new CacheKey(3)); + + assert iter.hasNext(); + + entry = iter.next(); + + assert entry.getKey().equals(new CacheKey(2)) || entry.getKey().equals(new CacheKey(3)); + assert !iter.hasNext(); + } + + /** + * Cache key. + */ + private static class CacheKey implements Externalizable { + /** Key. */ + private int key; + + /** + * @param key Key. + */ + CacheKey(int key) { + this.key = key; + } + + /** + * + */ + public CacheKey() { + /* No-op. */ + } + + /** + * @return Key. + */ + public int getKey() { + return key; + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + key = in.readInt(); + + keyDesCnt++; + + if (TEST_DEBUG) + X.println("Deserialized demo key [keyDesCnt=" + keyDesCnt + ", key=" + this + ']'); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(key); + + keySerCnt++; + + if (TEST_DEBUG) + X.println("Serialized demo key [serCnt=" + keySerCnt + ", key=" + this + ']'); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + CacheKey cacheKey; + + if (o instanceof CacheKey) + cacheKey = (CacheKey)o; + else + return false; + + return key == cacheKey.key; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheKey.class, this); + } + } + + /** + * Cache value.. + */ + private static class CacheValue implements Externalizable { + /** Value. */ + @CacheQuerySqlField + private String val; + + /** + * @param val Value. + */ + CacheValue(String val) { + this.val = val; + } + + /** + * + */ + public CacheValue() { + /* No-op. */ + } + + /** + * @return Value. + */ + public String getValue() { + return val; + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + val = U.readString(in); + + valDesCnt++; + + if (TEST_DEBUG) + X.println("Deserialized demo value [valDesCnt=" + valDesCnt + ", val=" + this + ']'); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, val); + + valSerCnt++; + + if (TEST_DEBUG) + X.println("Serialized demo value [serCnt=" + valSerCnt + ", val=" + this + ']'); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + CacheValue val = (CacheValue)o; + + return !(this.val != null ? !this.val.equals(val.val) : val.val != null); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val != null ? val.hashCode() : 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheValue.class, this); + } + } +}