Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-262 7326485ba -> efcf397e2


# IGNITE-262 - Fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/efcf397e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/efcf397e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/efcf397e

Branch: refs/heads/ignite-262
Commit: efcf397e2804f7236e6b9edc874a401fa22f35c9
Parents: 7326485
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Sun Feb 15 19:31:14 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Sun Feb 15 19:31:14 2015 -0800

----------------------------------------------------------------------
 .../datastructures/IgniteQueueExample.java      |  2 +-
 .../continuous/CacheContinuousQueryManager.java | 34 +++++++++++++++++---
 .../internal/util/nio/GridDirectParser.java     |  7 +---
 .../communication/tcp/TcpCommunicationSpi.java  |  7 ++--
 4 files changed, 33 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efcf397e/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteQueueExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteQueueExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteQueueExample.java
index 6cdfd15..df043c8 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteQueueExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteQueueExample.java
@@ -36,7 +36,7 @@ import java.util.*;
  */
 public class IgniteQueueExample {
     /** Cache name. */
-    private static final String CACHE_NAME = "partitioned_tx";
+    private static final String CACHE_NAME = "partitioned";
 
     /** Number of retries */
     private static final int RETRIES = 20;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efcf397e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index f0b668d..1347758 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -340,7 +340,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
-    private UUID executeQuery0(CacheEntryUpdatedListener<K, V> locLsnr, 
CacheEntryEventFilter<K, V> rmtFilter,
+    private UUID executeQuery0(CacheEntryUpdatedListener<K, V> locLsnr, final 
CacheEntryEventFilter<K, V> rmtFilter,
         int bufSize, long timeInterval, boolean autoUnsubscribe, boolean 
internal, boolean notifyExisting,
         boolean oldValRequired, boolean sync, boolean ignoreExpired, 
ClusterGroup grp) throws IgniteCheckedException {
         cctx.checkSecurity(GridSecurityPermission.CACHE_READ);
@@ -405,21 +405,45 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
             locLsnr.onUpdated(new Iterable<CacheEntryEvent<? extends K, ? 
extends V>>() {
                 @Override public Iterator<CacheEntryEvent<? extends K, ? 
extends V>> iterator() {
                     return new Iterator<CacheEntryEvent<? extends K, ? extends 
V>>() {
+                        private CacheContinuousQueryEvent<? extends K, ? 
extends V> next;
+
+                        {
+                            advance();
+                        }
+
                         @Override public boolean hasNext() {
                             return it.hasNext();
                         }
 
                         @Override public CacheEntryEvent<? extends K, ? 
extends V> next() {
-                            Cache.Entry<K, V> e = it.next();
+                            CacheEntryEvent<? extends K, ? extends V> next0 = 
next;
 
-                            return new CacheContinuousQueryEvent<>(
-                                
cctx.kernalContext().cache().jcache(cctx.name()), CREATED,
-                                new CacheContinuousQueryEntry<>(e.getKey(), 
e.getValue(), null, null, null));
+                            advance();
+
+                            return next0;
                         }
 
                         @Override public void remove() {
                             throw new UnsupportedOperationException();
                         }
+
+                        private void advance() {
+                            next = null;
+
+                            while (next == null) {
+                                if (!it.hasNext())
+                                    break;
+
+                                Cache.Entry<K, V> e = it.next();
+
+                                next = new CacheContinuousQueryEvent<>(
+                                    
cctx.kernalContext().cache().jcache(cctx.name()), CREATED,
+                                    new 
CacheContinuousQueryEntry<>(e.getKey(), e.getValue(), null, null, null));
+
+                                if (rmtFilter != null && 
!rmtFilter.evaluate(next))
+                                    next = null;
+                            }
+                        }
                     };
                 }
             });

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efcf397e/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index aad19f0..3b00bd9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -34,18 +34,13 @@ public class GridDirectParser implements GridNioParser {
     /** */
     private final MessageFactory msgFactory;
 
-    /** */
-    private final MessageFormatter formatter;
-
     /**
      * @param msgFactory Message factory.
-     * @param formatter Formatter.
      */
-    public GridDirectParser(MessageFactory msgFactory, MessageFormatter 
formatter) {
+    public GridDirectParser(MessageFactory msgFactory) {
         assert msgFactory != null;
 
         this.msgFactory = msgFactory;
-        this.formatter = formatter;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efcf397e/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 959d094..7843ce3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1504,8 +1504,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 };
 
-                GridDirectParser parser = new GridDirectParser(messageFactory, 
messageFormatter);
-
                 GridNioServer<MessageAdapter> srvr =
                     GridNioServer.<MessageAdapter>builder()
                         .address(locHost)
@@ -1523,7 +1521,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .directMode(true)
                         .metricsListener(metricsLsnr)
                         .writeTimeout(sockWriteTimeout)
-                        .filters(new GridNioCodecFilter(parser, log, true),
+                        .filters(new GridNioCodecFilter(new 
GridDirectParser(messageFactory), log, true),
                             new GridConnectionBytesVerifyFilter(log))
                         .messageFormatter(messageFormatter)
                         .build();
@@ -2452,8 +2450,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             try {
-                GridDirectParser parser = new 
GridDirectParser(getSpiContext().messageFactory(),
-                    getSpiContext().messageFormatter());
+                GridDirectParser parser = new 
GridDirectParser(getSpiContext().messageFactory());
 
                 IpcToNioAdapter<MessageAdapter> adapter = new 
IpcToNioAdapter<>(
                     metricsLsnr,

Reply via email to