Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 093a2fc94 -> dfc1a49c6
# ignite-45 - fixing streaming. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a034ed02 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a034ed02 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a034ed02 Branch: refs/heads/ignite-45 Commit: a034ed02d70f45c841401c99a962c077aa27f9fd Parents: 093a2fc Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Sat Mar 21 01:43:40 2015 -0700 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Sat Mar 21 01:43:40 2015 -0700 ---------------------------------------------------------------------- .../java8/streaming/marketdata/StreamMarketData.java | 3 --- .../ignite/examples/java8/streaming/numbers/CacheConfig.java | 8 ++------ .../java8/streaming/numbers/QueryPopularNumbers.java | 2 +- .../java8/streaming/numbers/StreamRandomNumbers.java | 2 +- .../src/main/java/org/apache/ignite/compute/ComputeJob.java | 3 +-- .../java/org/apache/ignite/compute/ComputeJobAdapter.java | 2 +- 6 files changed, 6 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a034ed02/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java index c2c1c2a..10c9caa 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java @@ -60,9 +60,6 @@ public class StreamMarketData { IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache()); try (IgniteDataStreamer<String, MarketTick> stmr = ignite.dataStreamer(mktCache.getName())) { - // Allow data updates. - stmr.allowOverwrite(true); - // Note that we receive market data, but do not populate 'mktCache' (it remains empty). // Instead we update the instruments in the 'instCache'. stmr.receiver(new StreamVisitor<>((cache, e) -> { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a034ed02/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java index bd6513a..4de5fb6 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java @@ -30,16 +30,12 @@ import static java.util.concurrent.TimeUnit.*; * data older than 1 second will be automatically removed from the cache. */ public class CacheConfig { - /** Cache name. */ - public static final String STREAM_NAME = "randomNumbers"; - /** * Configure streaming cache. */ - public static CacheConfiguration<Integer, Long> configure() { - CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(); + public static CacheConfiguration<Integer, Long> randomNumbersCache() { + CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("randomNumbers"); - cfg.setName(STREAM_NAME); cfg.setIndexedTypes(Integer.class, Long.class); // Sliding window of 1 seconds. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a034ed02/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java index f862553..06aa508 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java @@ -44,7 +44,7 @@ public class QueryPopularNumbers { return; // The cache is configured with sliding window holding 1 second of the streaming data. - IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.configure()); + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); // Select top 10 words. SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a034ed02/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java index c4e87d6..bf768fb 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java @@ -50,7 +50,7 @@ public class StreamRandomNumbers { return; // The cache is configured with sliding window holding 1 second of the streaming data. - IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.configure()); + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { // Allow data updates. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a034ed02/modules/core/src/main/java/org/apache/ignite/compute/ComputeJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJob.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJob.java index 87cb10f..ae52912 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJob.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJob.java @@ -18,7 +18,6 @@ package org.apache.ignite.compute; import org.apache.ignite.*; -import org.jetbrains.annotations.*; import java.io.*; import java.util.*; @@ -160,5 +159,5 @@ public interface ComputeJob extends Serializable { * If execution produces a {@link RuntimeException} or {@link Error}, then * it will be wrapped into {@link IgniteCheckedException}. */ - @Nullable public Object execute() throws IgniteException; + public Object execute() throws IgniteException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a034ed02/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobAdapter.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobAdapter.java index 71c43b8..80841ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobAdapter.java @@ -129,7 +129,7 @@ public abstract class ComputeJobAdapter implements ComputeJob, Callable<Object> } /** {@inheritDoc} */ - @Nullable @Override public final Object call() { + @Override public final Object call() { return execute(); } }