# IGNITE-262 - Fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/efcf397e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/efcf397e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/efcf397e Branch: refs/heads/sprint-1 Commit: efcf397e2804f7236e6b9edc874a401fa22f35c9 Parents: 7326485 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sun Feb 15 19:31:14 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sun Feb 15 19:31:14 2015 -0800 ---------------------------------------------------------------------- .../datastructures/IgniteQueueExample.java | 2 +- .../continuous/CacheContinuousQueryManager.java | 34 +++++++++++++++++--- .../internal/util/nio/GridDirectParser.java | 7 +--- .../communication/tcp/TcpCommunicationSpi.java | 7 ++-- 4 files changed, 33 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efcf397e/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteQueueExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteQueueExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteQueueExample.java index 6cdfd15..df043c8 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteQueueExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteQueueExample.java @@ -36,7 +36,7 @@ import java.util.*; */ public class IgniteQueueExample { /** Cache name. */ - private static final String CACHE_NAME = "partitioned_tx"; + private static final String CACHE_NAME = "partitioned"; /** Number of retries */ private static final int RETRIES = 20; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efcf397e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index f0b668d..1347758 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -340,7 +340,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ - private UUID executeQuery0(CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter, + private UUID executeQuery0(CacheEntryUpdatedListener<K, V> locLsnr, final CacheEntryEventFilter<K, V> rmtFilter, int bufSize, long timeInterval, boolean autoUnsubscribe, boolean internal, boolean notifyExisting, boolean oldValRequired, boolean sync, boolean ignoreExpired, ClusterGroup grp) throws IgniteCheckedException { cctx.checkSecurity(GridSecurityPermission.CACHE_READ); @@ -405,21 +405,45 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K locLsnr.onUpdated(new Iterable<CacheEntryEvent<? extends K, ? extends V>>() { @Override public Iterator<CacheEntryEvent<? extends K, ? extends V>> iterator() { return new Iterator<CacheEntryEvent<? extends K, ? extends V>>() { + private CacheContinuousQueryEvent<? extends K, ? extends V> next; + + { + advance(); + } + @Override public boolean hasNext() { return it.hasNext(); } @Override public CacheEntryEvent<? extends K, ? extends V> next() { - Cache.Entry<K, V> e = it.next(); + CacheEntryEvent<? extends K, ? extends V> next0 = next; - return new CacheContinuousQueryEvent<>( - cctx.kernalContext().cache().jcache(cctx.name()), CREATED, - new CacheContinuousQueryEntry<>(e.getKey(), e.getValue(), null, null, null)); + advance(); + + return next0; } @Override public void remove() { throw new UnsupportedOperationException(); } + + private void advance() { + next = null; + + while (next == null) { + if (!it.hasNext()) + break; + + Cache.Entry<K, V> e = it.next(); + + next = new CacheContinuousQueryEvent<>( + cctx.kernalContext().cache().jcache(cctx.name()), CREATED, + new CacheContinuousQueryEntry<>(e.getKey(), e.getValue(), null, null, null)); + + if (rmtFilter != null && !rmtFilter.evaluate(next)) + next = null; + } + } }; } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efcf397e/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java index aad19f0..3b00bd9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java @@ -34,18 +34,13 @@ public class GridDirectParser implements GridNioParser { /** */ private final MessageFactory msgFactory; - /** */ - private final MessageFormatter formatter; - /** * @param msgFactory Message factory. - * @param formatter Formatter. */ - public GridDirectParser(MessageFactory msgFactory, MessageFormatter formatter) { + public GridDirectParser(MessageFactory msgFactory) { assert msgFactory != null; this.msgFactory = msgFactory; - this.formatter = formatter; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efcf397e/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 959d094..7843ce3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1504,8 +1504,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } }; - GridDirectParser parser = new GridDirectParser(messageFactory, messageFormatter); - GridNioServer<MessageAdapter> srvr = GridNioServer.<MessageAdapter>builder() .address(locHost) @@ -1523,7 +1521,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .directMode(true) .metricsListener(metricsLsnr) .writeTimeout(sockWriteTimeout) - .filters(new GridNioCodecFilter(parser, log, true), + .filters(new GridNioCodecFilter(new GridDirectParser(messageFactory), log, true), new GridConnectionBytesVerifyFilter(log)) .messageFormatter(messageFormatter) .build(); @@ -2452,8 +2450,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { try { - GridDirectParser parser = new GridDirectParser(getSpiContext().messageFactory(), - getSpiContext().messageFormatter()); + GridDirectParser parser = new GridDirectParser(getSpiContext().messageFactory()); IpcToNioAdapter<MessageAdapter> adapter = new IpcToNioAdapter<>( metricsLsnr,