Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-426 88a92639d -> 37c3fe77e


# ignite-426


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/37c3fe77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/37c3fe77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/37c3fe77

Branch: refs/heads/ignite-426
Commit: 37c3fe77ede827cb758c683f15112fa5ec00a846
Parents: 88a9263
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Aug 17 16:17:03 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Aug 17 17:00:40 2015 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   5 +-
 ...acheContinuousQueryFailoverAbstractTest.java | 144 +++++++++++++++++--
 ...inuousQueryFailoverAtomicReplicatedTest.java |   6 +
 3 files changed, 142 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37c3fe77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 82e06db..14173d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -129,7 +129,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
     private final boolean rawRetval;
 
     /** Fast map flag. */
-    private boolean fastMap;
+    private final boolean fastMap;
 
     /** */
     private boolean fastMapRemap;
@@ -729,9 +729,6 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             return;
         }
 
-        if (fastMap && futVer == null)
-            fastMap = cctx.topology().rebalanceFinished(topVer);
-
         if (futVer == null)
             // Assign future version in topology read lock before first 
exception may be thrown.
             futVer = cctx.versions().next(topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37c3fe77/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index 151ae33..ce21968 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -260,7 +260,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         Affinity<Object> aff = qryClient.affinity(null);
 
-        CacheEventListener1 lsnr = new CacheEventListener1();
+        CacheEventListener1 lsnr = new CacheEventListener1(false);
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
@@ -455,7 +455,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         Ignite qryClient = startGrid(2);
 
-        CacheEventListener1 lsnr = new CacheEventListener1();
+        CacheEventListener1 lsnr = new CacheEventListener1(false);
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
@@ -487,7 +487,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractTest extends GridCommo
             }
         }, 2000);
 
-        assertTrue("Backup queue is not cleared: " + backupQueue, 
backupQueue.isEmpty());
+        assertTrue("Backup queue is not cleared: " + backupQueue, 
backupQueue.size() < BACKUP_ACK_THRESHOLD);
 
         if (!latch.await(5, SECONDS))
             fail("Failed to wait for notifications [exp=" + keys.size() + ", 
left=" + lsnr.latch.getCount() + ']');
@@ -525,7 +525,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractTest extends GridCommo
     public void testBackupQueueCleanupServerQuery() throws Exception {
         Ignite qryClient = startGridsMultiThreaded(2);
 
-        CacheEventListener1 lsnr = new CacheEventListener1();
+        CacheEventListener1 lsnr = new CacheEventListener1(false);
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
@@ -557,7 +557,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractTest extends GridCommo
             }
         }, 3000);
 
-        assertTrue("Backup queue is not cleared: " + backupQueue, 
backupQueue.isEmpty());
+        assertTrue("Backup queue is not cleared: " + backupQueue, 
backupQueue.size() < BACKUP_ACK_THRESHOLD);
 
         if (!latch.await(5, SECONDS))
             fail("Failed to wait for notifications [exp=" + keys.size() + ", 
left=" + lsnr.latch.getCount() + ']');
@@ -746,6 +746,105 @@ public abstract class 
CacheContinuousQueryFailoverAbstractTest extends GridCommo
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testMultiThreaded() throws Exception {
+        final int SRV_NODES = 3;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        Ignite qryClient = startGrid(SRV_NODES);
+
+        final IgniteCache<Object, Object> cache = qryClient.cache(null);
+
+        CacheEventListener1 lsnr = new CacheEventListener1(true);
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = cache.query(qry);
+
+        client = false;
+
+        final int SRV_IDX = SRV_NODES - 1;
+
+        List<Integer> keys = primaryKeys(ignite(SRV_IDX).cache(null), 10);
+
+        final int THREADS = 10;
+
+        for (int i = 0; i < keys.size(); i++) {
+            log.info("Iteration: " + i);
+
+            Ignite srv = ignite(SRV_IDX);
+
+            TestCommunicationSpi spi = 
(TestCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+            spi.sndFirstOnly = new AtomicBoolean(false);
+
+            final Integer key = keys.get(i);
+
+            final AtomicInteger val = new AtomicInteger();
+
+            CountDownLatch latch = new CountDownLatch(THREADS);
+
+            lsnr.latch = latch;
+
+            IgniteInternalFuture<?> fut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    Integer val0 = val.getAndIncrement();
+
+                    cache.put(key, val0);
+
+                    return null;
+                }
+            }, THREADS, "update-thread");
+
+            fut.get();
+
+            stopGrid(SRV_IDX);
+
+            if (!latch.await(5, SECONDS))
+                fail("Failed to wait for notifications [exp=" + THREADS + ", 
left=" + lsnr.latch.getCount() + ']');
+
+            assertEquals(THREADS, lsnr.allEvts.size());
+
+            Set<Integer> vals = new HashSet<>();
+
+            boolean err = false;
+
+            for (CacheEntryEvent<?, ?> evt : lsnr.allEvts) {
+                assertEquals(key, evt.getKey());
+                assertNotNull(evt.getValue());
+
+                if (!vals.add((Integer)evt.getValue())) {
+                    err = true;
+
+                    log.info("Extra event: " + evt);
+                }
+            }
+
+            for (int v = 0; v < THREADS; v++) {
+                if (!vals.contains(v)) {
+                    err = true;
+
+                    log.info("Event for value not received: " + v);
+                }
+            }
+
+            assertFalse("Invalid events, see log for details.", err);
+
+            lsnr.allEvts.clear();
+
+            startGrid(SRV_IDX);
+        }
+
+        cur.close();
+    }
+
+    /**
      * @param logAll If {@code true} logs all unexpected values.
      * @param expEvts Expected values.
      * @param lsnr Listener.
@@ -822,21 +921,36 @@ public abstract class 
CacheContinuousQueryFailoverAbstractTest extends GridCommo
         private ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new 
ConcurrentHashMap<>();
 
         /** */
+        private List<CacheEntryEvent<?, ?>> allEvts;
+
+        /** */
         @LoggerResource
         private IgniteLogger log;
 
+        /**
+         * @param saveAll Save all events flag.
+         */
+        CacheEventListener1(boolean saveAll) {
+            if (saveAll)
+                allEvts = new ArrayList<>();
+        }
+
         /** {@inheritDoc} */
         @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
             try {
                 for (CacheEntryEvent<?, ?> evt : evts) {
                     CountDownLatch latch = this.latch;
 
-                    log.info("Received cache event: " + evt + " " + (latch != 
null ? latch.getCount() : null));
+                    log.info("Received cache event [evt=" + evt +
+                        ", left=" + (latch != null ? latch.getCount() : null) 
+ ']');
 
                     this.evts.put(evt.getKey(), evt);
 
                     keys.add((Integer) evt.getKey());
 
+                    if (allEvts != null)
+                        allEvts.add(evt);
+
                     assertTrue(latch != null);
                     assertTrue(latch.getCount() > 0);
 
@@ -933,17 +1047,29 @@ public abstract class 
CacheContinuousQueryFailoverAbstractTest extends GridCommo
         /** */
         private volatile boolean skipMsg;
 
+        /** */
+        private volatile AtomicBoolean sndFirstOnly;
+
         /** {@inheritDoc} */
         @Override public void sendMessage(ClusterNode node, Message msg, 
IgniteInClosure<IgniteException> ackC)
             throws IgniteSpiException {
-            if (skipMsg && msg instanceof GridIoMessage) {
-                Object msg0 = ((GridIoMessage)msg).message();
+            Object msg0 = ((GridIoMessage)msg).message();
 
-                if (msg0 instanceof GridContinuousMessage) {
+            if (msg0 instanceof GridContinuousMessage) {
+                if (skipMsg) {
                     log.info("Skip continuous message: " + msg0);
 
                     return;
                 }
+                else {
+                    AtomicBoolean sndFirstOnly = this.sndFirstOnly;
+
+                    if (sndFirstOnly != null && 
!sndFirstOnly.compareAndSet(false, true)) {
+                        log.info("Skip continuous message: " + msg0);
+
+                        return;
+                    }
+                }
             }
 
             super.sendMessage(node, msg, ackC);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37c3fe77/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
index c8209d9..61f0560 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.cache.query.continuous;
 
 import org.apache.ignite.cache.*;
 
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
 import static org.apache.ignite.cache.CacheMode.*;
 
 /**
@@ -29,4 +30,9 @@ public class CacheContinuousQueryFailoverAtomicReplicatedTest 
extends CacheConti
     @Override protected CacheMode cacheMode() {
         return REPLICATED;
     }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
+        return PRIMARY;
+    }
 }

Reply via email to