# IGNITE-45 - Examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8f26539f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8f26539f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8f26539f Branch: refs/heads/ignite-45 Commit: 8f26539f8a16cbc52001f5e84199d9e257a3f4b4 Parents: da36a72 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sun Mar 22 14:10:59 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sun Mar 22 14:10:59 2015 -0700 ---------------------------------------------------------------------- .../streaming/marketdata/CacheConfig.java | 2 +- .../streaming/marketdata/Instrument.java | 14 ++--- .../streaming/marketdata/MarketTick.java | 59 -------------------- .../streaming/marketdata/StreamMarketData.java | 18 ++---- .../java8/streaming/marketdata/CacheConfig.java | 2 +- .../java8/streaming/marketdata/Instrument.java | 14 ++--- .../java8/streaming/marketdata/MarketTick.java | 59 -------------------- .../streaming/marketdata/StreamMarketData.java | 10 ++-- 8 files changed, 26 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f26539f/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/CacheConfig.java b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/CacheConfig.java index 7a2850e..f26ffb7 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/CacheConfig.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/CacheConfig.java @@ -26,7 +26,7 @@ public class CacheConfig { /** * Configure streaming cache for market ticks. */ - public static CacheConfiguration<String, MarketTick> marketTicksCache() { + public static CacheConfiguration<String, Double> marketTicksCache() { return new CacheConfiguration<>("marketTicks"); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f26539f/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/Instrument.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/Instrument.java b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/Instrument.java index 23f002a..6daaffe 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/Instrument.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/Instrument.java @@ -51,17 +51,17 @@ public class Instrument implements Serializable { } /** - * Updates this instrument based on the latest market tick. + * Updates this instrument based on the latest price. * - * @param tick Market tick. + * @param price Latest price. */ - public void update(MarketTick tick) { + public void update(double price) { if (open == 0) - open = tick.price(); + open = price; - high = Math.max(high, tick.price()); - low = Math.min(low, tick.price()); - this.latest = tick.price(); + high = Math.max(high, price); + low = Math.min(low, price); + this.latest = price; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f26539f/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/MarketTick.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/MarketTick.java b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/MarketTick.java deleted file mode 100644 index 949921ba..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/MarketTick.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming.marketdata; - -import java.io.*; - -/** - * Represents a market tick data. - */ -public class MarketTick implements Serializable { - /** Instrument symbol. */ - private final String symbol; - - /** Price. */ - private final double price; - - /** - * @param symbol Symbol. - * @param price Price. - */ - MarketTick(String symbol, double price) { - this.symbol = symbol; - this.price = price; - } - - /** - * @return Symbol. - */ - public String symbol() { - return symbol; - } - - /** - * @return Price. - */ - public double price() { - return price; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "MarketTick [symbol=" + symbol + ", price=" + price + ']'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f26539f/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/StreamMarketData.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/StreamMarketData.java b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/StreamMarketData.java index d7363e2..bfac698 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/StreamMarketData.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/StreamMarketData.java @@ -38,9 +38,6 @@ public class StreamMarketData { /** Random number generator. */ private static final Random RAND = new Random(); - /** Count of total numbers to generate. */ - private static final int CNT = 10000000; - /** The list of instruments. */ private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"}; @@ -56,17 +53,16 @@ public class StreamMarketData { return; // The cache is configured with sliding window holding 1 second of the streaming data. - IgniteCache<String, MarketTick> mktCache = ignite.getOrCreateCache(CacheConfig.marketTicksCache()); + IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(CacheConfig.marketTicksCache()); final IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache()); - try (IgniteDataStreamer<String, MarketTick> mktStmr = ignite.dataStreamer(mktCache.getName())) { + try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) { // Note that we receive market data, but do not populate 'mktCache' (it remains empty). // Instead we update the instruments in the 'instCache'. - mktStmr.receiver(new StreamVisitor<String, MarketTick>() { - @Override - public void apply(IgniteCache<String, MarketTick> cache, Map.Entry<String, MarketTick> e) { + mktStmr.receiver(new StreamVisitor<String, Double>() { + @Override public void apply(IgniteCache<String, Double> cache, Map.Entry<String, Double> e) { String symbol = e.getKey(); - MarketTick tick = e.getValue(); + Double tick = e.getValue(); Instrument inst = instCache.get(symbol); @@ -88,9 +84,7 @@ public class StreamMarketData { // numbers closer to 0 have higher probability. double price = round2(INITIAL_PRICES[j] + RAND.nextGaussian()); - MarketTick tick = new MarketTick(INSTRUMENTS[j], price); - - mktStmr.addData(tick.symbol(), tick); + mktStmr.addData(INSTRUMENTS[j], price); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f26539f/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java index 592f5cc..5a36838 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java @@ -26,7 +26,7 @@ public class CacheConfig { /** * Configure streaming cache for market ticks. */ - public static CacheConfiguration<String, MarketTick> marketTicksCache() { + public static CacheConfiguration<String, Double> marketTicksCache() { return new CacheConfiguration<>("marketTicks"); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f26539f/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java index 96a880a..af9d0bf 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java @@ -51,17 +51,17 @@ public class Instrument implements Serializable { } /** - * Updates this instrument based on the latest market tick. + * Updates this instrument based on the latest price. * - * @param tick Market tick. + * @param price Latest price. */ - public void update(MarketTick tick) { + public void update(double price) { if (open == 0) - open = tick.price(); + open = price; - high = Math.max(high, tick.price()); - low = Math.min(low, tick.price()); - this.latest = tick.price(); + high = Math.max(high, price); + low = Math.min(low, price); + this.latest = price; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f26539f/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/MarketTick.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/MarketTick.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/MarketTick.java deleted file mode 100644 index 7a72189..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/MarketTick.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.java8.streaming.marketdata; - -import java.io.*; - -/** - * Represents a market tick data. - */ -public class MarketTick implements Serializable { - /** Instrument symbol. */ - private final String symbol; - - /** Price. */ - private final double price; - - /** - * @param symbol Symbol. - * @param price Price. - */ - MarketTick(String symbol, double price) { - this.symbol = symbol; - this.price = price; - } - - /** - * @return Symbol. - */ - public String symbol() { - return symbol; - } - - /** - * @return Price. - */ - public double price() { - return price; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "MarketTick [symbol=" + symbol + ", price=" + price + ']'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f26539f/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java index bfe4802..ced27d2 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java @@ -53,15 +53,15 @@ public class StreamMarketData { return; // The cache is configured with sliding window holding 1 second of the streaming data. - IgniteCache<String, MarketTick> mktCache = ignite.getOrCreateCache(CacheConfig.marketTicksCache()); + IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(CacheConfig.marketTicksCache()); IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache()); - try (IgniteDataStreamer<String, MarketTick> mktStmr = ignite.dataStreamer(mktCache.getName())) { + try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) { // Note that we receive market data, but do not populate 'mktCache' (it remains empty). // Instead we update the instruments in the 'instCache'. mktStmr.receiver(StreamVisitor.from((cache, e) -> { String symbol = e.getKey(); - MarketTick tick = e.getValue(); + Double tick = e.getValue(); Instrument inst = instCache.get(symbol); @@ -81,9 +81,7 @@ public class StreamMarketData { // numbers closer to 0 have higher probability. double price = round2(INITIAL_PRICES[j] + RAND.nextGaussian()); - MarketTick tick = new MarketTick(INSTRUMENTS[j], price); - - mktStmr.addData(tick.symbol(), tick); + mktStmr.addData(INSTRUMENTS[j], price); } } }