Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 51f75909b -> b97bc6690
# ignite-45 - copied streaming to java7. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b97bc669 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b97bc669 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b97bc669 Branch: refs/heads/ignite-45 Commit: b97bc6690f60cf8d723c2c656ca21dc3f247965c Parents: 51f7590 Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Sat Mar 21 11:42:32 2015 -0700 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Sat Mar 21 11:42:32 2015 -0700 ---------------------------------------------------------------------- .../java7/streaming/marketdata/CacheConfig.java | 45 ++++++++ .../java7/streaming/marketdata/Instrument.java | 106 +++++++++++++++++++ .../java7/streaming/marketdata/MarketTick.java | 59 +++++++++++ .../marketdata/QueryTopInstruments.java | 73 +++++++++++++ .../streaming/marketdata/StreamMarketData.java | 105 ++++++++++++++++++ .../java7/streaming/numbers/CacheConfig.java | 46 ++++++++ .../streaming/numbers/QueryPopularNumbers.java | 74 +++++++++++++ .../streaming/numbers/StreamRandomNumbers.java | 79 ++++++++++++++ .../java8/streaming/marketdata/Instrument.java | 14 +-- .../marketdata/QueryTopInstruments.java | 9 +- .../streaming/marketdata/StreamMarketData.java | 11 +- 11 files changed, 604 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/CacheConfig.java b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/CacheConfig.java new file mode 100644 index 0000000..9b1c096 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/CacheConfig.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java7.streaming.marketdata; + +import org.apache.ignite.configuration.*; + +/** + * Configuration for the streaming cache to store the stream of random numbers. + * This cache is configured with sliding window of 1 second, which means that + * data older than 1 second will be automatically removed from the cache. + */ +public class CacheConfig { + /** Cache name. */ + public static final String STREAM_NAME = "marketTicks"; + + /** + * Configure streaming cache. + */ + public static CacheConfiguration<String, MarketTick> marketTicksCache() { + return new CacheConfiguration<>("marketTicks"); + } + + public static CacheConfiguration<String, Instrument> instrumentCache() { + CacheConfiguration<String, Instrument> instCache = new CacheConfiguration<>("instCache"); + + instCache.setIndexedTypes(String.class, Instrument.class); + + return instCache; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/Instrument.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/Instrument.java b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/Instrument.java new file mode 100644 index 0000000..e51e4b9 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/Instrument.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java7.streaming.marketdata; + +import org.apache.ignite.cache.query.annotations.*; + +import java.io.*; + +/** + * Financial instrument. + */ +public class Instrument implements Serializable { + /** Instrument symbol. */ + @QuerySqlField(index = true) + private final String symbol; + + /** Open price. */ + @QuerySqlField(index = true) + private double open; + + /** High price. */ + private double high; + + /** Low price. */ + private double low = Long.MAX_VALUE; + + /** Close price. */ + @QuerySqlField(index = true) + private double latest; + + /** + * @param symbol Symbol. + */ + Instrument(String symbol) { + this.symbol = symbol; + } + + /** + * Updates this instrument based on the latest market tick. + * + * @param tick Market tick. + */ + public void update(MarketTick tick) { + if (open == 0) + open = tick.price(); + + high = Math.max(high, tick.price()); + low = Math.min(low, tick.price()); + this.latest = tick.price(); + } + + /** + * @return Symbol. + */ + public String symbol() { + return symbol; + } + + /** + * @return Open price. + */ + public double open() { + return open; + } + + /** + * @return High price. + */ + public double high() { + return high; + } + + /** + * @return Low price. + */ + public double low() { + return low; + } + + /** + * @return Close price. + */ + public double latest() { + return latest; + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return "Instrument [symbol=" + symbol + ", latest=" + latest + ", change=" + (latest - open) + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/MarketTick.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/MarketTick.java b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/MarketTick.java new file mode 100644 index 0000000..f38c061 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/MarketTick.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java7.streaming.marketdata; + +import java.io.*; + +/** + * Represents a market tick data. + */ +public class MarketTick implements Serializable { + /** Instrument symbol. */ + private final String symbol; + + /** Price. */ + private final double price; + + /** + * @param symbol Symbol. + * @param price Price. + */ + MarketTick(String symbol, double price) { + this.symbol = symbol; + this.price = price; + } + + /** + * @return Symbol. + */ + public String symbol() { + return symbol; + } + + /** + * @return Price. + */ + public double price() { + return price; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "MarketTick [symbol=" + symbol + ", price=" + price + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/QueryTopInstruments.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/QueryTopInstruments.java b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/QueryTopInstruments.java new file mode 100644 index 0000000..87a1173 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/QueryTopInstruments.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java7.streaming.marketdata; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.examples.java8.*; + +import java.util.*; + +/** + * Periodically query popular numbers from the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamMarketData}.</li> + * <li>Start querying top performing instruments using {@link QueryTopInstruments}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class QueryTopInstruments { + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache()); + + // Select top 3 instruments. + SqlFieldsQuery top3qry = new SqlFieldsQuery( + "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3"); + + // Select total profit. + SqlFieldsQuery profitQry = new SqlFieldsQuery("select sum(latest - open) from Instrument"); + + // Query top 3 best performing instruments every 5 seconds. + while (true) { + // Execute queries. + List<List<?>> top3 = instCache.query(top3qry).getAll(); + List<List<?>> profit = instCache.query(profitQry).getAll(); + + List<?> row = profit.get(0); + + if (row.get(0) != null) + System.out.printf("Total profit: %.2f%n", row.get(0)); + + // Print top 10 words. + ExamplesUtils.printQueryResults(top3); + + Thread.sleep(5000); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java new file mode 100644 index 0000000..469f510 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java7.streaming.marketdata; + +import org.apache.ignite.*; +import org.apache.ignite.examples.java8.*; +import org.apache.ignite.stream.*; + +import java.util.*; + +/** + * Stream random numbers into the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamMarketData}.</li> + * <li>Start querying top performing instruments using {@link QueryTopInstruments}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class StreamMarketData { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Count of total numbers to generate. */ + private static final int CNT = 10000000; + + /** The list of instruments. */ + private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"}; + + /** The list of initial instrument prices. */ + private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50}; + + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache<String, MarketTick> mktCache = ignite.getOrCreateCache(CacheConfig.marketTicksCache()); + IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache()); + + try (IgniteDataStreamer<String, MarketTick> stmr = ignite.dataStreamer(mktCache.getName())) { + // Note that we receive market data, but do not populate 'mktCache' (it remains empty). + // Instead we update the instruments in the 'instCache'. + stmr.receiver(new StreamVisitor<>((cache, e) -> { + String symbol = e.getKey(); + MarketTick tick = e.getValue(); + + Instrument inst = instCache.get(symbol); + + if (inst == null) + inst = new Instrument(symbol); + + // Update cached instrument based on the latest market tick. + inst.update(tick); + + instCache.put(symbol, inst); + })); + + // Stream market data into market data stream cache. + while (true) { + for (int j = 0; j < INSTRUMENTS.length; j++) { + // Use gaussian distribution to ensure that + // numbers closer to 0 have higher probability. + double price = round2(INITIAL_PRICES[j] + RAND.nextGaussian()); + + MarketTick tick = new MarketTick(INSTRUMENTS[j], price); + + stmr.addData(tick.symbol(), tick); + } + } + } + } + } + + /** + * Rounds double value to two significant signs. + * + * @param val value to be rounded. + * @return rounded double value. + */ + private static double round2(double val) { + return Math.floor(100 * val + 0.5) / 100; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/CacheConfig.java b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/CacheConfig.java new file mode 100644 index 0000000..643cc9c --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/CacheConfig.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java7.streaming.numbers; + +import org.apache.ignite.configuration.*; + +import javax.cache.configuration.*; +import javax.cache.expiry.*; + +import static java.util.concurrent.TimeUnit.*; + +/** + * Configuration for the streaming cache to store the stream of random numbers. + * This cache is configured with sliding window of 1 second, which means that + * data older than 1 second will be automatically removed from the cache. + */ +public class CacheConfig { + /** + * Configure streaming cache. + */ + public static CacheConfiguration<Integer, Long> randomNumbersCache() { + CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("randomNumbers"); + + cfg.setIndexedTypes(Integer.class, Long.class); + + // Sliding window of 1 seconds. + cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new CreatedExpiryPolicy(new Duration(SECONDS, 1)))); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/QueryPopularNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/QueryPopularNumbers.java b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/QueryPopularNumbers.java new file mode 100644 index 0000000..399f916 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/QueryPopularNumbers.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java7.streaming.numbers; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.examples.java8.*; + +import java.util.*; + +/** + * Periodically query popular numbers from the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamRandomNumbers}.</li> + * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class QueryPopularNumbers { + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); + + // Select top 10 words. + SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10"); + + // Select average, min, and max counts among all the words. + SqlFieldsQuery statsQry = new SqlFieldsQuery("select avg(_val), min(_val), max(_val) from Long"); + + // Query top 10 popular numbers every 5 seconds. + while (true) { + // Execute queries. + List<List<?>> top10 = stmCache.query(top10Qry).getAll(); + List<List<?>> stats = stmCache.query(statsQry).getAll(); + + // Print average count. + List<?> row = stats.get(0); + + if (row.get(0) != null) + System.out.printf("Query results [avg=%.2f, min=%d, max=%d]%n", row.get(0), row.get(1), row.get(2)); + + // Print top 10 words. + ExamplesUtils.printQueryResults(top10); + + Thread.sleep(5000); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/StreamRandomNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/StreamRandomNumbers.java b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/StreamRandomNumbers.java new file mode 100644 index 0000000..bbac4d4 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/StreamRandomNumbers.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java7.streaming.numbers; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.examples.java8.*; +import org.apache.ignite.stream.*; + +import javax.cache.processor.*; +import java.util.*; + +/** + * Stream random numbers into the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamRandomNumbers}.</li> + * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class StreamRandomNumbers { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; + + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); + + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Configure data transformation to count instances of the same word. + stmr.receiver(new StreamTransformer<>(new CacheEntryProcessor<Integer, Long, Object>() { + @Override + public Object process(MutableEntry<Integer, Long> e, Object... arg) { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + } + })); + + // Stream random numbers into the streamer cache. + while (true) + stmr.addData(RAND.nextInt(RANGE), 1L); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java index 62b3721..96a880a 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java @@ -51,17 +51,17 @@ public class Instrument implements Serializable { } /** - * Updates this bar with last price. + * Updates this instrument based on the latest market tick. * - * @param latest Price. + * @param tick Market tick. */ - public void update(double latest) { + public void update(MarketTick tick) { if (open == 0) - open = latest; + open = tick.price(); - high = Math.max(high, latest); - low = Math.min(low, latest); - this.latest = latest; + high = Math.max(high, tick.price()); + low = Math.min(low, tick.price()); + this.latest = tick.price(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java index 649ff66..33feb6a 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java @@ -49,16 +49,19 @@ public class QueryTopInstruments { SqlFieldsQuery top3qry = new SqlFieldsQuery( "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3"); + // Select total profit. + SqlFieldsQuery profitQry = new SqlFieldsQuery("select sum(latest - open) from Instrument"); + // Query top 3 best performing instruments every 5 seconds. while (true) { // Execute queries. List<List<?>> top3 = instCache.query(top3qry).getAll(); + List<List<?>> profit = instCache.query(profitQry).getAll(); - // Print average count. - List<?> row = top3.get(0); + List<?> row = profit.get(0); if (row.get(0) != null) - System.out.println("Top Performing Instruments:"); + System.out.printf("Total profit: %.2f%n", row.get(0)); // Print top 10 words. ExamplesUtils.printQueryResults(top3); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java index 10c9caa..104ed17 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java @@ -68,14 +68,11 @@ public class StreamMarketData { Instrument inst = instCache.get(symbol); - if (inst == null) { - Instrument old = instCache.getAndPutIfAbsent(symbol, inst = new Instrument(symbol)); + if (inst == null) + inst = new Instrument(symbol); - if (old != null) - inst = old; - } - - inst.update(tick.price()); + // Update cached instrument based on the latest market tick. + inst.update(tick); instCache.put(symbol, inst); }));