http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java deleted file mode 100644 index dd65bd8..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java +++ /dev/null @@ -1,964 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.events.IgniteEventType.*; -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; - -/** - * Tests events. - */ -public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTest { - /** */ - private static final boolean TEST_INFO = true; - - /** Wait timeout. */ - private static final long WAIT_TIMEOUT = 5000; - - /** Key. */ - private static final String KEY = "key"; - - /** */ - private static volatile int gridCnt; - - /** - * @return {@code True} if partitioned. - */ - protected boolean partitioned() { - return false; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - gridCnt = gridCount(); - - for (int i = 0; i < gridCnt; i++) - grid(i).events().localListen(new TestEventListener(partitioned()), EVTS_CACHE); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - if (TEST_INFO) - info("Called beforeTest() callback."); - - TestEventListener.reset(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - if (TEST_INFO) - info("Called afterTest() callback."); - - TestEventListener.stopListen(); - - try { - super.afterTest(); - } - finally { - TestEventListener.listen(); - } - } - - /** - * Waits for event count on all nodes. - * - * @param gridIdx Grid index. - * @param evtCnts Array of tuples with values: V1 - event type, V2 - expected event count on one node. - * @throws InterruptedException If thread has been interrupted while waiting. - */ - private void waitForEvents(int gridIdx, IgniteBiTuple<Integer, Integer>... evtCnts) throws Exception { - if (!F.isEmpty(evtCnts)) - try { - TestEventListener.waitForEventCount(((GridKernal)grid(0)).context(), evtCnts); - } - catch (IgniteCheckedException e) { - printEventCounters(gridIdx, evtCnts); - - throw e; - } - } - - /** - * @param gridIdx Grid index. - * @param expCnts Expected counters - */ - private void printEventCounters(int gridIdx, IgniteBiTuple<Integer, Integer>[] expCnts) { - info("Printing counters [gridIdx=" + gridIdx + ']'); - - for (IgniteBiTuple<Integer, Integer> t : expCnts) { - Integer evtType = t.get1(); - - int actCnt = TestEventListener.eventCount(evtType); - - info("Event [evtType=" + evtType + ", expCnt=" + t.get2() + ", actCnt=" + actCnt + ']'); - } - } - - /** - * Clear caches without generating events. - * - * @throws IgniteCheckedException If failed to clear caches. - */ - private void clearCaches() throws IgniteCheckedException { - for (int i = 0; i < gridCnt; i++) { - GridCache<String, Integer> cache = cache(i); - - cache.removeAll(); - - assert cache.isEmpty(); - } - } - - /** - * Runs provided {@link TestCacheRunnable} instance on all caches. - * - * @param run {@link TestCacheRunnable} instance. - * @param evtCnts Expected event counts for each iteration. - * @throws Exception In failed. - */ - @SuppressWarnings({"CaughtExceptionImmediatelyRethrown"}) - private void runTest(TestCacheRunnable run, IgniteBiTuple<Integer, Integer>... evtCnts) throws Exception { - for (int i = 0; i < gridCount(); i++) { - info(">>> Running test for grid [idx=" + i + ", grid=" + grid(i).name() + - ", id=" + grid(i).localNode().id() + ']'); - - try { - run.run(cache(i)); - - waitForEvents(i, evtCnts); - } - catch (Exception e) { // Leave this catch to be able to set breakpoint. - throw e; - } - finally { - // This call is mainly required to correctly clear event futures. - TestEventListener.reset(); - - clearCaches(); - - // This call is required for the second time to reset counters for - // the previous call. - TestEventListener.reset(); - } - } - } - - /** - * Get key-value pairs. - * - * @param size Pairs count. - * @return Key-value pairs. - */ - private Map<String, Integer> pairs(int size) { - Map<String, Integer> pairs = new HashMap<>(size); - - for (int i = 1; i <= size; i++) - pairs.put(KEY + i, i); - - return pairs; - } - - /** - * @throws Exception If test failed. - */ - public void testFilteredPut() throws Exception { - GridCache<String, Integer> cache = grid(0).cache(null); - - String key = "1"; - int val = 1; - - assert !cache.putx(key, val, F.<String, Integer>cacheHasPeekValue()); - - assert !cache.containsKey(key); - - assertEquals(0, TestEventListener.eventCount(EVT_CACHE_OBJECT_PUT)); - - assert cache.putx(key, val); - - assert cache.containsKey(key); - - waitForEvents(0, F.t(EVT_CACHE_OBJECT_PUT, gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - public void testGetPutRemove() throws Exception { - // TODO: GG-7578. - if (cache(0).configuration().getCacheMode() == GridCacheMode.REPLICATED) - return; - - runTest( - new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - String key = "key"; - Integer val = 1; - - assert cache.put(key, val) == null; - - assert cache.containsKey(key); - - assertEquals(val, cache.get(key)); - - assertEquals(val, cache.remove(key)); - - assert !cache.containsKey(key); - } - }, - F.t(EVT_CACHE_OBJECT_PUT, gridCnt), - F.t(EVT_CACHE_OBJECT_READ, 3), - F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt) - ); - } - - /** - * @throws Exception If test failed. - */ - public void testGetPutRemoveTx1() throws Exception { - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Map.Entry<String, Integer> e = F.first(pairs(1).entrySet()); - - assert e != null; - - String key = e.getKey(); - Integer val = e.getValue(); - - IgniteTx tx = cache.txStart(); - - assert cache.put(key, val) == null; - - assert cache.containsKey(key); - - assert val.equals(cache.get(key)); - - assert val.equals(cache.remove(key)); - - assert !cache.containsKey(key); - - tx.commit(); - - assert !cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - public void testGetPutRemoveTx2() throws Exception { - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Map.Entry<String, Integer> e = F.first(pairs(1).entrySet()); - - assert e != null; - - String key = e.getKey(); - Integer val = e.getValue(); - - IgniteTx tx = cache.txStart(); - - assert cache.put(key, val) == null; - - assert cache.containsKey(key); - - assert val.equals(cache.get(key)); - - assert val.equals(cache.remove(key)); - - assert !cache.containsKey(key); - - assert cache.put(key, val) == null; - - assert cache.containsKey(key); - - tx.commit(); - - assert cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - public void testGetPutRemoveAsync() throws Exception { - // TODO: GG-7578. - if (cache(0).configuration().getCacheMode() == GridCacheMode.REPLICATED) - return; - - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Map.Entry<String, Integer> e = F.first(pairs(1).entrySet()); - - assert e != null; - - String key = e.getKey(); - Integer val = e.getValue(); - - assert cache.putAsync(key, val).get() == null; - - assert cache.containsKey(key); - - assert val.equals(cache.getAsync(key).get()); - - assert val.equals(cache.removeAsync(key).get()); - - assert !cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt), F.t(EVT_CACHE_OBJECT_READ, 3), F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - public void testGetPutRemoveAsyncTx1() throws Exception { - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Map.Entry<String, Integer> e = F.first(pairs(1).entrySet()); - - assert e != null; - - String key = e.getKey(); - Integer val = e.getValue(); - - IgniteTx tx = cache.txStart(); - - assert cache.putAsync(key, val).get() == null; - - assert cache.containsKey(key); - - assert val.equals(cache.getAsync(key).get()); - - assert val.equals(cache.removeAsync(key).get()); - - assert !cache.containsKey(key); - - tx.commit(); - - assert !cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - public void testGetPutRemoveAsyncTx2() throws Exception { - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Map.Entry<String, Integer> e = F.first(pairs(1).entrySet()); - - assert e != null; - - String key = e.getKey(); - Integer val = e.getValue(); - - IgniteTx tx = cache.txStart(); - - assert cache.putAsync(key, val).get() == null; - - assert cache.containsKey(key); - - assert val.equals(cache.getAsync(key).get()); - - assert val.equals(cache.removeAsync(key).get()); - - assert !cache.containsKey(key); - - assert cache.putAsync(key, val).get() == null; - - assert cache.containsKey(key); - - tx.commit(); - - assert cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - public void testPutRemovex() throws Exception { - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Map.Entry<String, Integer> e = F.first(pairs(1).entrySet()); - - assert e != null; - - String key = e.getKey(); - Integer val = e.getValue(); - - assert cache.putx(key, val); - - assert cache.containsKey(key); - - assert cache.removex(key); - - assert !cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt), F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - public void testPutRemovexTx1() throws Exception { - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Map.Entry<String, Integer> e = F.first(pairs(1).entrySet()); - - assert e != null; - - String key = e.getKey(); - Integer val = e.getValue(); - - IgniteTx tx = cache.txStart(); - - assert cache.putx(key, val); - - assert cache.containsKey(key); - - assert cache.removex(key); - - assert !cache.containsKey(key); - - tx.commit(); - - assert !cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - public void testPutRemovexTx2() throws Exception { - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Map.Entry<String, Integer> e = F.first(pairs(1).entrySet()); - - assert e != null; - - String key = e.getKey(); - Integer val = e.getValue(); - - IgniteTx tx = cache.txStart(); - - assert cache.putx(key, val); - - assert cache.containsKey(key); - - assert cache.removex(key); - - assert !cache.containsKey(key); - - assert cache.putx(key, val); - - assert cache.containsKey(key); - - tx.commit(); - - assert cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - public void testPutIfAbsent() throws Exception { - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator(); - - Map.Entry<String, Integer> e = iter.next(); - - String key = e.getKey(); - Integer val = e.getValue(); - - assert cache.putIfAbsent(key, val) == null; - assert val.equals(cache.putIfAbsent(key, val)); - - assert cache.containsKey(key); - - e = iter.next(); - - key = e.getKey(); - val = e.getValue(); - - assert cache.putxIfAbsent(key, val); - assert !cache.putxIfAbsent(key, val); - - assert cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_PUT, 2 * gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - public void testPutIfAbsentTx() throws Exception { - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator(); - - IgniteTx tx = cache.txStart(); - - Map.Entry<String, Integer> e = iter.next(); - - String key = e.getKey(); - Integer val = e.getValue(); - - assert cache.putIfAbsent(key, val) == null; - - assertEquals(val, cache.putIfAbsent(key, val)); - - assert cache.containsKey(key); - - e = iter.next(); - - key = e.getKey(); - val = e.getValue(); - - assert cache.putxIfAbsent(key, val); - assert !cache.putxIfAbsent(key, val); - - assert cache.containsKey(key); - - tx.commit(); - - assert cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_PUT, 2 * gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - public void testPutIfAbsentAsync() throws Exception { - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator(); - - Map.Entry<String, Integer> e = iter.next(); - - String key = e.getKey(); - Integer val = e.getValue(); - - assert cache.putIfAbsentAsync(key, val).get() == null; - assert val.equals(cache.putIfAbsentAsync(key, val).get()); - - assert cache.containsKey(key); - - e = iter.next(); - - key = e.getKey(); - val = e.getValue(); - - assert cache.putxIfAbsentAsync(key, val).get(); - assert !cache.putxIfAbsentAsync(key, val).get(); - - assert cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_PUT, 2 * gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - @SuppressWarnings("unchecked") - public void testPutIfAbsentAsyncTx() throws Exception { - IgniteBiTuple[] evts = new IgniteBiTuple[] {F.t(EVT_CACHE_OBJECT_PUT, 2 * gridCnt), F.t(EVT_CACHE_OBJECT_READ, 1)}; - - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator(); - - // Optimistic transaction. - IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ); - - Map.Entry<String, Integer> e = iter.next(); - - String key = e.getKey(); - Integer val = e.getValue(); - - assert cache.putIfAbsentAsync(key, val).get() == null; - assert val.equals(cache.putIfAbsentAsync(key, val).get()); - - assert cache.containsKey(key); - - e = iter.next(); - - key = e.getKey(); - val = e.getValue(); - - assert cache.putxIfAbsentAsync(key, val).get(); - assert !cache.putxIfAbsentAsync(key, val).get(); - - assert cache.containsKey(key); - - tx.commit(); - - assert cache.containsKey(key); - } - }, evts); - } - - /** - * @throws Exception If test failed. - */ - public void testFilteredPutRemovex() throws Exception { - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Map.Entry<String, Integer> e = F.first(pairs(1).entrySet()); - - assert e != null; - - IgnitePredicate<GridCacheEntry<String, Integer>> noPeekVal = F.cacheNoPeekValue(); - IgnitePredicate<GridCacheEntry<String, Integer>> hasPeekVal = F.cacheHasPeekValue(); - - String key = e.getKey(); - Integer val = e.getValue(); - - assert !cache.putx(key, val, hasPeekVal); - assert cache.putx(key, val, noPeekVal); - - assert cache.containsKey(key); - - assert !cache.removex(key, noPeekVal); - assert cache.removex(key, hasPeekVal); - - assert !cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt), F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - public void testFilteredPutRemovexTx1() throws Exception { - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - assert cache.keySet().isEmpty() : "Key set is not empty: " + cache().keySet(); - - Map.Entry<String, Integer> e = F.first(pairs(1).entrySet()); - - assert e != null; - - IgnitePredicate<GridCacheEntry<String, Integer>> noPeekVal = F.cacheNoPeekValue(); - IgnitePredicate<GridCacheEntry<String, Integer>> hasPeekVal = F.cacheHasPeekValue(); - - String key = e.getKey(); - Integer val = e.getValue(); - - // Optimistic. - IgniteTx tx = cache.txStart(); - - assert !cache.putx(key, val, hasPeekVal); - assert cache.putx(key, val, noPeekVal); - - assert cache.containsKey(key); - - assert !cache.removex(key, noPeekVal); - assert cache.removex(key); - - assert !cache.containsKey(key); - - tx.commit(); - - assert !cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt)); - } - - /** - * @throws Exception If test failed. - */ - public void testFilteredPutRemovexTx2() throws Exception { - runTest(new TestCacheRunnable() { - @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { - Map.Entry<String, Integer> e = F.first(pairs(1).entrySet()); - - assert e != null; - - IgnitePredicate<GridCacheEntry<String, Integer>> noPeekVal = F.cacheNoPeekValue(); - IgnitePredicate<GridCacheEntry<String, Integer>> hasPeekVal = F.cacheHasPeekValue(); - - String key = e.getKey(); - Integer val = e.getValue(); - - IgniteTx tx = cache.txStart(); - - assert !cache.putx(key, val, hasPeekVal); - assert cache.putx(key, val, noPeekVal); - - assert cache.containsKey(key); - - assert !cache.removex(key, noPeekVal); - assert cache.removex(key, hasPeekVal); - - assert !cache.containsKey(key); - - assert !cache.putx(key, val, hasPeekVal); - assert cache.putx(key, val, noPeekVal); - - assert cache.containsKey(key); - - tx.commit(); - - assert cache.containsKey(key); - } - }, F.t(EVT_CACHE_OBJECT_PUT, gridCnt)); - } - - /** - * - */ - private static interface TestCacheRunnable { - /** - * @param cache Cache. - * @throws IgniteCheckedException If any exception occurs. - */ - void run(GridCache<String, Integer> cache) throws IgniteCheckedException; - } - - /** - * Local event listener. - */ - private static class TestEventListener implements IgnitePredicate<IgniteEvent> { - /** Events count map. */ - private static ConcurrentMap<Integer, AtomicInteger> cntrs = new ConcurrentHashMap<>(); - - /** Event futures. */ - private static Collection<EventTypeFuture> futs = new GridConcurrentHashSet<>(); - - /** */ - private static volatile boolean listen = true; - - /** */ - private static boolean partitioned; - - /** - * @param p Partitioned flag. - */ - private TestEventListener(boolean p) { - partitioned = p; - } - - /** - * - */ - private static void listen() { - listen = true; - } - - /** - * - */ - private static void stopListen() { - listen = false; - } - - /** - * @param type Event type. - * @return Count. - */ - static int eventCount(int type) { - assert type > 0; - - AtomicInteger cntr = cntrs.get(type); - - return cntr != null ? cntr.get() : 0; - } - - /** - * Reset listener. - */ - static void reset() { - cntrs.clear(); - - futs.clear(); - } - - /** {@inheritDoc} */ - @Override public boolean apply(IgniteEvent evt) { - assert evt instanceof IgniteCacheEvent; - - if (!listen) - return true; - - if (TEST_INFO) - X.println("Cache event: " + evt.shortDisplay()); - - AtomicInteger cntr = F.addIfAbsent(cntrs, evt.type(), F.newAtomicInt()); - - assert cntr != null; - - int cnt = cntr.incrementAndGet(); - - for (EventTypeFuture f : futs) - f.onEvent(evt.type(), cnt); - - return true; - } - - /** - * Waits for event count. - * - * @param ctx Kernal context. - * @param evtCnts Array of tuples with values: V1 - event type, V2 - expected event count. - * @throws IgniteCheckedException If failed to wait. - */ - private static void waitForEventCount(GridKernalContext ctx, - IgniteBiTuple<Integer, Integer>... evtCnts) throws IgniteCheckedException { - if (F.isEmpty(evtCnts)) - return; - - // Create future that aggregates all required event types. - GridCompoundIdentityFuture<Object> cf = new GridCompoundIdentityFuture<>(ctx); - - for (IgniteBiTuple<Integer, Integer> t : evtCnts) { - Integer evtType = t.get1(); - Integer expCnt = t.get2(); - - assert expCnt != null && expCnt > 0; - - EventTypeFuture fut = new EventTypeFuture(ctx, evtType, expCnt, partitioned); - - futs.add(fut); - - // We need to account the window. - AtomicInteger cntr = cntrs.get(evtType); - - if (!fut.isDone()) - fut.onEvent(evtType, cntr != null ? cntr.get() : 0); - - cf.add(fut); - } - - cf.markInitialized(); - - try { - cf.get(WAIT_TIMEOUT); - } - catch (IgniteFutureTimeoutException e) { - throw new RuntimeException("Timed out waiting for events: " + cf, e); - } - } - } - - /** - * - */ - private static class EventTypeFuture extends GridFutureAdapter<Object> { - /** */ - private int evtType; - - /** */ - private int expCnt; - - /** */ - private int cnt; - - /** Partitioned flag. */ - private boolean partitioned; - - /** - * For {@link Externalizable}. - */ - public EventTypeFuture() { - // No-op. - } - - /** - * @param ctx Kernal context. - * @param evtType Event type. - * @param expCnt Expected count. - * @param partitioned Partitioned flag. - */ - EventTypeFuture(GridKernalContext ctx, int evtType, int expCnt, boolean partitioned) { - super(ctx); - - assert expCnt > 0; - - this.evtType = evtType; - this.expCnt = expCnt; - this.partitioned = partitioned; - } - - /** - * @return Count. - */ - int count() { - return cnt; - } - - /** - * @param evtType Event type. - * @param cnt Count. - */ - void onEvent(int evtType, int cnt) { - if (isDone() || this.evtType != evtType) - return; - - if (TEST_INFO) - X.println("EventTypeFuture.onEvent() [evtName=" + U.gridEventName(evtType) + ", evtType=" + evtType + - ", cnt=" + cnt + ", expCnt=" + expCnt + ']'); - - this.cnt = cnt; - - - // For partitioned caches we allow extra event for reads. - if (expCnt < cnt && (!partitioned || evtType != EVT_CACHE_OBJECT_READ || expCnt + 1 < cnt)) - onDone(new IgniteCheckedException("Wrong event count [evtName=" + U.gridEventName(evtType) + ", evtType=" + - evtType + ", expCnt=" + expCnt + ", actCnt=" + cnt + ", partitioned=" + partitioned + "]")); - - if (expCnt == cnt || (partitioned && expCnt + 1 == cnt)) - onDone(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(EventTypeFuture.class, this, "evtName", U.gridEventName(evtType)); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiredEntriesPreloadAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiredEntriesPreloadAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiredEntriesPreloadAbstractSelfTest.java deleted file mode 100644 index 090b7dc..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiredEntriesPreloadAbstractSelfTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.gridgain.testframework.*; - -import javax.cache.expiry.*; -import java.util.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.cache.GridCachePreloadMode.*; -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * Tests preloading of expired entries. - */ -public abstract class GridCacheExpiredEntriesPreloadAbstractSelfTest extends GridCacheAbstractSelfTest { - /** */ - private static final int GRID_CNT = 2; - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrid(0); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - } - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return GRID_CNT; - } - - /** {@inheritDoc} */ - @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { - CacheConfiguration cfg = super.cacheConfiguration(gridName); - - cfg.setPreloadMode(SYNC); - cfg.setCacheStoreFactory(null); - cfg.setWriteThrough(false); - cfg.setReadThrough(false); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testExpiredEntriesPreloading() throws Exception { - GridCache<String, Integer> cache0 = cache(0); - - final int KEYS_NUM = 3; - - for (int i = 0; i < KEYS_NUM; i++) - cache0.put(String.valueOf(i), 0); - - final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, 100L)); - - IgniteCache cache = grid(0).jcache(null).withExpiryPolicy(expiry); - - for (int i = 0; i < KEYS_NUM; i++) - cache.put(String.valueOf(i), i); - - // Allow entries to expire. - U.sleep(1000); - - // Ensure entries expiration. - for (int i = 0; i < KEYS_NUM; i++) - assert cache0.get(String.valueOf(i)) == null; - - // Start another node. - Ignite g1 = startGrid(1); - - final GridCacheAdapter<String, Integer> cache1 = ((GridKernal)g1).context().cache().internalCache(); - - cache1.preloader().syncFuture().get(); - - Collection<IgniteEvent> evts = g1.events().localQuery(F.<IgniteEvent>alwaysTrue(), EVT_CACHE_PRELOAD_OBJECT_LOADED); - - assertEquals("Expected all entries are preloaded.", KEYS_NUM, evts.size()); - - boolean rmv = GridTestUtils.waitForCondition(new PAX() { - @Override public boolean applyx() { - return cache1.isEmpty(); - } - }, 10_000); - - assertTrue("Expired entries were not removed.", rmv); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheLockAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheLockAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheLockAbstractTest.java deleted file mode 100644 index 03002d5..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheLockAbstractTest.java +++ /dev/null @@ -1,463 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.gridgain.testframework.*; -import org.gridgain.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.cache.GridCachePreloadMode.*; -import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; - -/** - * Test cases for multi-threaded tests. - */ -@SuppressWarnings({"FieldCanBeLocal"}) -public abstract class GridCacheLockAbstractTest extends GridCommonAbstractTest { - /** Grid1. */ - private static Ignite ignite1; - - /** Grid2. */ - private static Ignite ignite2; - - /** (for convenience). */ - private static IgniteCache<Integer, String> cache1; - - /** (for convenience). */ - private static IgniteCache<Integer, String> cache2; - - /** Ip-finder. */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** - * - */ - protected GridCacheLockAbstractTest() { - super(false /*start grid. */); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(disco); - - cfg.setCacheConfiguration(cacheConfiguration()); - - return cfg; - } - - /** - * @return Cache configuration. - */ - protected CacheConfiguration cacheConfiguration() { - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setCacheMode(cacheMode()); - cacheCfg.setWriteSynchronizationMode(FULL_ASYNC); - cacheCfg.setPreloadMode(SYNC); - cacheCfg.setAtomicityMode(TRANSACTIONAL); - cacheCfg.setDistributionMode(NEAR_PARTITIONED); - - return cacheCfg; - } - - /** - * @return Cache mode. - */ - protected abstract GridCacheMode cacheMode(); - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - ignite1 = startGrid(1); - ignite2 = startGrid(2); - - cache1 = ignite1.jcache(null); - cache2 = ignite2.jcache(null); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - info("Executing afterTest() callback..."); - - info("Before 1st removeAll()."); - - cache1.removeAll(); - - info("Before 2nd removeAll()."); - - cache2.removeAll(); - - assert cache1.size() == 0 : "Cache is not empty: " + cache1; - assert cache2.size() == 0 : "Cache is not empty: " + cache2; - } - - /** - * @return Partitioned flag. - */ - protected boolean isPartitioned() { - return false; - } - - /** - * @param k Key to check. - * @param idx Grid index. - * @return {@code True} if locked. - */ - private boolean locked(Integer k, int idx) { - if (isPartitioned()) - return near(idx).isLockedNearOnly(k); - - return cache(idx).isLocked(k); - } - - /** - * @param keys Keys to check. - * @param idx Grid index. - * @return {@code True} if locked. - */ - private boolean locked(Iterable<Integer> keys, int idx) { - if (isPartitioned()) - return near(idx).isAllLockedNearOnly(keys); - - for (Integer key : keys) { - if (!cache(idx).isLocked(key)) - return false; - } - - return true; - } - - /** - * @throws Exception If test failed. - */ - @SuppressWarnings({"TooBroadScope"}) - public void testLockSingleThread() throws Exception { - int k = 1; - String v = String.valueOf(k); - - info("Before lock for key: " + k); - - cache1.lock(k).lock(); - - info("After lock for key: " + k); - - try { - assert cache1.isLocked(k); - assert cache1.isLockedByThread(k); - - // Put to cache. - cache1.put(k, v); - - info("Put " + k + '=' + k + " key pair into cache."); - } - finally { - cache1.lock(k).unlock(); - - info("Unlocked key: " + k); - } - - assert !locked(k, 1); - assert !cache1.isLockedByThread(k); - } - - /** - * @throws Exception If test failed. - */ - @SuppressWarnings({"TooBroadScope"}) - public void testLock() throws Exception { - final int kv = 1; - - final CountDownLatch l1 = new CountDownLatch(1); - final CountDownLatch l2 = new CountDownLatch(1); - - GridTestThread t1 = new GridTestThread(new Callable<Object>() { - @Nullable @Override public Object call() throws Exception { - info("Before lock for key: " + kv); - - cache1.lock(kv).lock(); - - info("After lock for key: " + kv); - - try { - assert cache1.isLocked(kv); - assert cache1.isLockedByThread(kv); - - l1.countDown(); - - info("Let thread2 proceed."); - - cache1.put(kv, Integer.toString(kv)); - - info("Put " + kv + '=' + Integer.toString(kv) + " key pair into cache."); - } - finally { - Thread.sleep(1000); - - cache1.lockAll(Collections.singleton(kv)).unlock(); - - info("Unlocked key in thread 1: " + kv); - } - - l2.await(); - - assert !cache1.isLockedByThread(kv); - assert !locked(kv, 1); - - return null; - } - }); - - GridTestThread t2 = new GridTestThread(new Callable<Object>() { - @Nullable @Override public Object call() throws Exception { - info("Waiting for latch1..."); - - l1.await(); - - cache2.lock(kv).lock(); - - try { - String v = cache2.get(kv); - - assert v != null : "Value is null for key: " + kv; - - assertEquals(Integer.toString(kv), v); - } - finally { - cache2.lockAll(Collections.singleton(kv)).unlock(); - - info("Unlocked key in thread 2: " + kv); - } - - assert !locked(kv, 2); - assert !cache2.isLockedByThread(kv); - - Thread.sleep(1000); - - l2.countDown(); - - return null; - } - }); - - t1.start(); - t2.start(); - - t1.join(); - t2.join(); - - t1.checkError(); - t2.checkError(); - } - - /** - * @throws Exception If test failed. - */ - public void testLockAndPut() throws Exception { - final CountDownLatch l1 = new CountDownLatch(1); - final CountDownLatch l2 = new CountDownLatch(1); - - GridTestThread t1 = new GridTestThread(new Callable<Object>() { - @Nullable @Override public Object call() throws Exception { - cache1.lock(1).lock(); - - info("Locked cache key: 1"); - - try { - assert cache1.isLocked(1); - assert cache1.isLockedByThread(1); - - info("Verified that cache key is locked: 1"); - - cache1.put(1, "1"); - - info("Put key value pair into cache: 1='1'"); - - l1.countDown(); - - info("Released latch1"); - - // Hold lock for a bit. - Thread.sleep(50); - - info("Woke up from sleep."); - } - finally { - cache1.lockAll(Collections.singleton(1)).unlock(); - - info("Unlocked cache key: 1"); - } - - l2.await(); - - assert !locked(1, 1); - assert !cache1.isLockedByThread(1); - - return null; - } - }); - - GridTestThread t2 = new GridTestThread(new Callable<Object>() { - @Nullable @Override public Object call() throws Exception { - info("Beginning to await on latch 1"); - - l1.await(); - - info("Finished awaiting on latch 1"); - - assertEquals("1", cache1.get(1)); - - info("Retrieved value from cache for key: 1"); - - cache1.put(1, "2"); - - info("Put key-value pair into cache: 1='2'"); - - assertEquals("2", cache1.getAndRemove(1)); - - l2.countDown(); - - info("Removed key from cache: 1"); - - return null; - } - }); - - t1.start(); - t2.start(); - - t1.join(); - t2.join(); - - t1.checkError(); - t2.checkError(); - } - - /** - * @throws Exception If test failed. - */ - @SuppressWarnings({"TooBroadScope"}) - public void testLockTimeoutTwoThreads() throws Exception { - int keyCnt = 1; - - final Set<Integer> keys = new HashSet<>(); - - for (int i = 1; i <= keyCnt; i++) - keys.add(i); - - final CountDownLatch l1 = new CountDownLatch(1); - final CountDownLatch l2 = new CountDownLatch(1); - - IgniteFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Nullable @Override public Object call() throws Exception { - info("Before lock for keys."); - - cache1.lockAll(keys).lock(); - - info("After lock for keys."); - - try { - for (Integer key : keys) { - assert cache1.isLocked(key); - assert cache1.isLockedByThread(key); - } - - l1.countDown(); - - info("Let thread2 proceed."); - - for (int i : keys) { - info("Before put key: " + i); - - cache1.put(i, Integer.toString(i)); - - if (i % 50 == 0) - info("Stored key pairs in cache: " + i); - } - } - finally { - l2.await(); - - info("Before unlock keys in thread 1: " + keys); - - cache1.lockAll(keys).unlock(); - - info("Unlocked entry for keys."); - } - - assert !locked(keys, 1); - - return null; - } - }, 1, "TEST-THREAD-1"); - - IgniteFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Nullable @Override public Object call() throws Exception { - info("Waiting for latch1..."); - - try { - l1.await(); - - // This call should not acquire the lock since - // other thread is holding it. - assert !cache1.lockAll(keys).tryLock(); - - info("Before unlock keys in thread 2: " + keys); - - cache1.lockAll(keys).unlock(); - - // The keys should still be locked. - for (Integer key : keys) - assert cache1.isLocked(key); - } - finally { - l2.countDown(); - } - - return null; - } - }, 1, "TEST-THREAD-2"); - - fut1.get(); - fut2.get(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMixedModeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMixedModeSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMixedModeSelfTest.java deleted file mode 100644 index 299b524..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMixedModeSelfTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.typedef.*; -import org.gridgain.testframework.junits.common.*; - -/** - * Tests cache puts in mixed mode. - */ -public class GridCacheMixedModeSelfTest extends GridCommonAbstractTest { - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setCacheConfiguration(cacheConfiguration(gridName)); - - return cfg; - } - - /** - * @param gridName Grid name. - * @return Cache configuration. - */ - private CacheConfiguration cacheConfiguration(String gridName) { - CacheConfiguration cfg = new CacheConfiguration(); - - cfg.setCacheMode(GridCacheMode.PARTITIONED); - - if (F.eq(gridName, getTestGridName(0))) - cfg.setDistributionMode(GridCacheDistributionMode.NEAR_ONLY); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGrids(4); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** - * @throws Exception If failed. - */ - public void testBasicOps() throws Exception { - GridCache<Object, Object> cache = grid(0).cache(null); - - for (int i = 0; i < 1000; i++) - cache.put(i, i); - - for (int i = 0; i < 1000; i++) - assertEquals(i, cache.get(i)); - - for (int i = 0; i < 1000; i++) - assertEquals(i, cache.remove(i)); - - for (int i = 0; i < 1000; i++) - assertNull(cache.get(i)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheModuloAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheModuloAffinityFunction.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheModuloAffinityFunction.java deleted file mode 100644 index 7eb7e52..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheModuloAffinityFunction.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * Affinity which controls where nodes end up using mod operation. - */ -public class GridCacheModuloAffinityFunction implements GridCacheAffinityFunction { - /** Node attribute for index. */ - public static final String IDX_ATTR = "nodeIndex"; - - /** Number of backups. */ - private int backups = -1; - - /** Number of partitions. */ - private int parts = -1; - - /** - * Empty constructor. - */ - public GridCacheModuloAffinityFunction() { - // No-op. - } - - /** - * @param parts Number of partitions. - * @param backups Number of backups. - */ - public GridCacheModuloAffinityFunction(int parts, int backups) { - assert parts > 0; - assert backups >= 0; - - this.parts = parts; - this.backups = backups; - } - - /** - * @param parts Number of partitions. - */ - public void partitions(int parts) { - assert parts > 0; - - this.parts = parts; - } - - /** - * @param backups Number of backups. - */ - public void backups(int backups) { - assert backups >= 0; - - this.backups = backups; - } - - /** - * @return Number of backups. - */ - public int backups() { - return backups; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public List<List<ClusterNode>> assignPartitions(GridCacheAffinityFunctionContext ctx) { - List<List<ClusterNode>> res = new ArrayList<>(parts); - - Collection<ClusterNode> topSnapshot = ctx.currentTopologySnapshot(); - - for (int part = 0; part < parts; part++) { - res.add(F.isEmpty(topSnapshot) ? - Collections.<ClusterNode>emptyList() : - // Wrap affinity nodes with unmodifiable list since unmodifiable generic collection - // doesn't provide equals and hashCode implementations. - U.sealList(nodes(part, topSnapshot))); - } - - return Collections.unmodifiableList(res); - } - - /** {@inheritDoc} */ - public Collection<ClusterNode> nodes(int part, Collection<ClusterNode> nodes) { - List<ClusterNode> sorted = new ArrayList<>(nodes); - - Collections.sort(sorted, new Comparator<ClusterNode>() { - @Override public int compare(ClusterNode n1, ClusterNode n2) { - int idx1 = n1.<Integer>attribute(IDX_ATTR); - int idx2 = n2.<Integer>attribute(IDX_ATTR); - - return idx1 < idx2 ? -1 : idx1 == idx2 ? 0 : 1; - } - }); - - int max = 1 + backups; - - if (max > nodes.size()) - max = nodes.size(); - - Collection<ClusterNode> ret = new ArrayList<>(max); - - Iterator<ClusterNode> it = sorted.iterator(); - - for (int i = 0; i < max; i++) { - ClusterNode n = null; - - if (i == 0) { - while (it.hasNext()) { - n = it.next(); - - int nodeIdx = n.<Integer>attribute(IDX_ATTR); - - if (part <= nodeIdx) - break; - else - n = null; - } - } - else { - if (it.hasNext()) - n = it.next(); - else { - it = sorted.iterator(); - - assert it.hasNext(); - - n = it.next(); - } - } - - assert n != null || nodes.size() < parts; - - if (n == null) - n = (it = sorted.iterator()).next(); - - - ret.add(n); - } - - return ret; - } - - /** - * @param parts Number of partitions. - * @param backups Number of backups. - */ - public void reset(int parts, int backups) { - this.parts = parts; - this.backups = backups; - } - - /** {@inheritDoc} */ - @Override public void reset() { - parts = -1; - backups = -1; - } - - /** {@inheritDoc} */ - @Override public int partitions() { - return parts; - } - - /** {@inheritDoc} */ - @Override public int partition(Object key) { - if (key instanceof Number) - return ((Number)key).intValue() % parts; - - return key == null ? 0 : U.safeAbs(key.hashCode() % parts); - } - - /** {@inheritDoc} */ - @Override public void removeNode(UUID nodeId) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheModuloAffinityFunction.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultiNodeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultiNodeAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultiNodeAbstractTest.java deleted file mode 100644 index 59ce331..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultiNodeAbstractTest.java +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.tostring.*; -import org.gridgain.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * Multi-node cache test. - */ -public abstract class GridCacheMultiNodeAbstractTest extends GridCommonAbstractTest { - /** Grid 1. */ - private static Ignite ignite1; - - /** Grid 2. */ - private static Ignite ignite2; - - /** Grid 3. */ - private static Ignite ignite3; - - /** Cache 1. */ - private static GridCache<Integer, String> cache1; - - /** Cache 2. */ - private static GridCache<Integer, String> cache2; - - /** Cache 3. */ - private static GridCache<Integer, String> cache3; - - /** */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** Listeners. */ - private static Collection<CacheEventListener> lsnrs = new ArrayList<>(); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(ipFinder); - - c.setDiscoverySpi(disco); - - return c; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - ignite1 = startGrid(1); - ignite2 = startGrid(2); - ignite3 = startGrid(3); - - cache1 = ignite1.cache(null); - cache2 = ignite2.cache(null); - cache3 = ignite3.cache(null); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - - cache1 = null; - cache2 = null; - cache3 = null; - - ignite1 = null; - ignite2 = null; - ignite3 = null; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - removeListeners(ignite1); - removeListeners(ignite2); - removeListeners(ignite3); - - lsnrs.clear(); - } - - /** - * @param ignite Grid to remove listeners from. - */ - private void removeListeners(Ignite ignite) { - if (ignite != null) - for (CacheEventListener lsnr : lsnrs) { - assert lsnr.latch.getCount() == 0; - - ignite.events().stopLocalListen(lsnr); - } - } - - /** - * - * @param ignite Grid. - * @param lsnr Listener. - * @param type Event types. - */ - private void addListener(Ignite ignite, CacheEventListener lsnr, int... type) { - if (!lsnrs.contains(lsnr)) - lsnrs.add(lsnr); - - ignite.events().localListen(lsnr, type.length == 0 ? EVTS_CACHE : type); - } - - /** - * @throws Exception If test failed. - */ - public void testBasicPut() throws Exception { - checkPuts(3, ignite1); - } - - /** - * @throws Exception If test fails. - */ - public void testMultiNodePut() throws Exception { - checkPuts(1, ignite1, ignite2, ignite3); - checkPuts(1, ignite2, ignite1, ignite3); - checkPuts(1, ignite3, ignite1, ignite2); - } - - /** - * @throws Exception If test fails. - */ - public void testMultiValuePut() throws Exception { - checkPuts(1, ignite1); - } - - /** - * @throws Exception If test fails. - */ - public void testMultiValueMultiNodePut() throws Exception { - checkPuts(3, ignite1, ignite2, ignite3); - checkPuts(3, ignite2, ignite1, ignite3); - checkPuts(3, ignite3, ignite1, ignite2); - } - - /** - * Checks cache puts. - * - * @param cnt Count of puts. - * @param ignites Grids. - * @throws Exception If check fails. - */ - private void checkPuts(int cnt, Ignite... ignites) throws Exception { - CountDownLatch latch = new CountDownLatch(ignites.length * cnt); - - CacheEventListener lsnr = new CacheEventListener(latch, EVT_CACHE_OBJECT_PUT); - - for (Ignite ignite : ignites) - addListener(ignite, lsnr); - - GridCache<Integer, String> cache1 = ignites[0].cache(null); - - for (int i = 1; i <= cnt; i++) - cache1.put(i, "val" + i); - - for (int i = 1; i <= cnt; i++) { - String v = cache1.get(i); - - assert v != null; - assert v.equals("val" + i); - } - - latch.await(10, SECONDS); - - for (Ignite ignite : ignites) { - GridCache<Integer, String> cache = ignite.cache(null); - - if (cache == cache1) - continue; - - for (int i = 1; i <= cnt; i++) { - String v = cache.get(i); - - assert v != null; - assert v.equals("val" + i); - } - } - - assert !cache1.isLocked(1); - assert !cache1.isLocked(2); - assert !cache1.isLocked(3); - - for (Ignite ignite : ignites) - ignite.events().stopLocalListen(lsnr); - } - - /** - * @throws Exception If test failed. - */ - public void testLockUnlock() throws Exception { - CacheEventListener lockLsnr1 = new CacheEventListener(ignite1, new CountDownLatch(1), EVT_CACHE_OBJECT_LOCKED); - - addListener(ignite1, lockLsnr1, EVT_CACHE_OBJECT_LOCKED); - - CacheEventListener unlockLsnr = new CacheEventListener(new CountDownLatch(3), EVT_CACHE_OBJECT_UNLOCKED); - - addListener(ignite1, unlockLsnr, EVT_CACHE_OBJECT_UNLOCKED); - addListener(ignite2, unlockLsnr, EVT_CACHE_OBJECT_UNLOCKED); - addListener(ignite3, unlockLsnr, EVT_CACHE_OBJECT_UNLOCKED); - - IgniteFuture<Boolean> f1 = cache1.lockAsync(1, 0L); - - assert f1.get(10000); - - assert cache1.isLocked(1); - assert cache2.isLocked(1); - assert cache3.isLocked(1); - - assert cache1.isLockedByThread(1); - assert !cache2.isLockedByThread(1); - assert !cache3.isLockedByThread(1); - - info("Acquired lock for cache1."); - - cache1.unlockAll(F.asList(1)); - - Thread.sleep(50); - - unlockLsnr.latch.await(10, SECONDS); - - assert !cache1.isLocked(1); - assert !cache2.isLocked(2); - assert !cache3.isLocked(3); - - assert !cache1.isLockedByThread(1); - assert !cache2.isLockedByThread(1); - assert !cache3.isLockedByThread(1); - } - - /** - * Concurrent test for asynchronous locks. - * - * @throws Exception If test fails. - */ - @SuppressWarnings({"BusyWait"}) - public void testConcurrentLockAsync() throws Exception { - CacheEventListener unlockLsnr = new CacheEventListener(new CountDownLatch(9), EVT_CACHE_OBJECT_UNLOCKED); - - addListener(ignite1, unlockLsnr, EVT_CACHE_OBJECT_UNLOCKED); - addListener(ignite2, unlockLsnr, EVT_CACHE_OBJECT_UNLOCKED); - addListener(ignite3, unlockLsnr, EVT_CACHE_OBJECT_UNLOCKED); - - IgniteFuture<Boolean> f1 = cache1.lockAsync(1, 0L); - IgniteFuture<Boolean> f2 = cache2.lockAsync(1, 0L); - IgniteFuture<Boolean> f3 = cache3.lockAsync(1, 0L); - - boolean l1 = false; - boolean l2 = false; - boolean l3 = false; - - int cnt = 0; - - while (!l1 || !l2 || !l3) { - if (!l1 && f1.isDone()) { - assert cache1.isLocked(1); - assert cache2.isLocked(1); - assert cache3.isLocked(1); - - assert cache1.isLockedByThread(1); - assert !cache2.isLockedByThread(1); - assert !cache3.isLockedByThread(1); - - info("Acquired lock for cache1."); - - cache1.unlockAll(F.asList(1)); - - l1 = true; - } - - if (!l2 && f2.isDone()) { - assert cache1.isLocked(1); - assert cache2.isLocked(1); - assert cache3.isLocked(1); - - assert !cache1.isLockedByThread(1); - assert cache2.isLockedByThread(1); - assert !cache3.isLockedByThread(1); - - info("Acquired lock for cache2."); - - cache2.unlockAll(F.asList(1)); - - l2 = true; - } - - if (!l3 && f3.isDone()) { - assert cache1.isLocked(1); - assert cache2.isLocked(1); - assert cache3.isLocked(1); - - assert !cache1.isLockedByThread(1); - assert !cache2.isLockedByThread(1); - assert cache3.isLockedByThread(1); - - info("Acquired lock for cache3."); - - cache3.unlockAll(F.asList(1)); - - l3 = true; - } - - info("Acquired locks [cnt=" + ++cnt + ", l1=" + l1 + ", l2=" + l2 + ", l3=" + l3 + ']'); - - Thread.sleep(50); - } - - unlockLsnr.latch.await(10, SECONDS); - - assert !cache1.isLocked(1); - assert !cache2.isLocked(2); - assert !cache3.isLocked(3); - - assert !cache1.isLockedByThread(1); - assert !cache2.isLockedByThread(1); - assert !cache3.isLockedByThread(1); - } - - /** - * @throws Exception If test failed. - */ - public void testConcurrentPutAsync() throws Exception { - CountDownLatch latch = new CountDownLatch(9); - - CacheEventListener lsnr = new CacheEventListener(latch, EVT_CACHE_OBJECT_PUT); - - addListener(ignite1, lsnr); - addListener(ignite2, lsnr); - addListener(ignite3, lsnr); - - IgniteFuture<String> f1 = cache1.putAsync(2, "val1"); - IgniteFuture<String> f2 = cache2.putAsync(2, "val2"); - IgniteFuture<String> f3 = cache3.putAsync(2, "val3"); - - String v1 = f1.get(20000); - - info("Got v1 from future1: " + v1); - - String v2 = f2.get(20000); - - info("Got v2 from future2: " + v2); - - String v3 = f3.get(20000); - - info("Got v3 from future3: " + v3); - - latch.await(60, SECONDS); - - info("Woke up from latch: " + latch); - - v1 = cache1.get(1); - v2 = cache2.get(1); - v3 = cache3.get(1); - - info("Cache1 value for key 1: " + v1); - info("Cache2 value for key 1: " + v2); - info("Cache3 value for key 1: " + v3); - - assert v1 != null; - assert v2 != null; - assert v3 != null; - - assert v1.equals(v2) : "Mismatch [v1=" + v1 + ", v2=" + v2 + ']'; - assert v1.equals(v3) : "Mismatch [v1=" + v1 + ", v3=" + v3 + ']'; - } - - /** - * @throws Exception If test failed. - */ - public void testGlobalClearAll() throws Exception { - cache1.put(1, "val1"); - cache2.put(2, "val2"); - cache3.put(3, "val3"); - - assert cache1.size() == 3; - assert cache2.size() == 3; - assert cache3.size() == 3; - - cache1.globalClearAll(); - - assert cache1.isEmpty(); - assert cache2.isEmpty(); - assert cache3.isEmpty(); - } - - /** - * Event listener. - */ - private class CacheEventListener implements IgnitePredicate<IgniteEvent> { - /** */ - @GridToStringExclude - private final Ignite ignite; - - /** Wait latch. */ - @GridToStringExclude - private CountDownLatch latch; - - /** Events to accept. */ - private final List<Integer> evts; - - /** - * @param latch Wait latch. - * @param evts Events. - */ - CacheEventListener(CountDownLatch latch, Integer... evts) { - this.latch = latch; - - ignite = null; - - assert evts.length > 0; - - this.evts = Arrays.asList(evts); - } - - /** - * @param ignite Grid. - * @param latch Wait latch. - * @param evts Events. - */ - CacheEventListener(Ignite ignite, CountDownLatch latch, Integer... evts) { - this.ignite = ignite; - this.latch = latch; - - assert evts.length > 0; - - this.evts = Arrays.asList(evts); - } - - /** - * @param latch New latch. - */ - void setLatch(CountDownLatch latch) { - this.latch = latch; - } - - /** {@inheritDoc} */ - @Override public boolean apply(IgniteEvent evt) { - info("Grid cache event [type=" + evt.type() + ", latch=" + latch.getCount() + ", evt=" + evt + ']'); - - if (evts.contains(evt.type())) - if (ignite == null || evt.node().id().equals(ignite.cluster().localNode().id())) { - if (latch.getCount() > 0) - latch.countDown(); - else - info("Received unexpected cache event: " + evt); - } - - return true; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheEventListener.class, this, "latchCount", latch.getCount(), - "grid", ignite != null ? ignite.name() : "N/A", "evts", evts); - } - } -}