Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 305f8a2e3 -> 093a2fc94
# ignite-45 - fixing streaming. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/093a2fc9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/093a2fc9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/093a2fc9 Branch: refs/heads/ignite-45 Commit: 093a2fc94248b4d67cf4c2179f450e40c41b8408 Parents: 305f8a2 Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Sat Mar 21 01:33:24 2015 -0700 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Sat Mar 21 01:33:24 2015 -0700 ---------------------------------------------------------------------- .../ignite/examples/java8/ExamplesUtils.java | 7 +- .../java8/streaming/marketdata/CacheConfig.java | 45 ++++++++ .../java8/streaming/marketdata/Instrument.java | 66 ++++------- .../marketdata/QueryTopInstruments.java | 70 ++++++++++++ .../streaming/marketdata/StreamMarketData.java | 111 +++++++++++++++++++ .../streaming/numbers/QueryPopularNumbers.java | 6 +- .../streaming/numbers/StreamRandomNumbers.java | 6 +- .../org/apache/ignite/IgniteDataStreamer.java | 4 +- .../configuration/CacheConfiguration.java | 5 + .../org/apache/ignite/stream/StreamVisitor.java | 53 +++++++++ 10 files changed, 316 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java index 5b431bc..0fc7506 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java @@ -111,7 +111,12 @@ public class ExamplesUtils { List<?> l = (List)row; for (int i = 0; i < l.size(); i++) { - System.out.print(l.get(i)); + Object o = l.get(i); + + if (o instanceof Double || o instanceof Float) + System.out.printf("%.2f", o); + else + System.out.print(l.get(i)); if (i + 1 != l.size()) System.out.print(','); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java new file mode 100644 index 0000000..90bdcf1 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.streaming.marketdata; + +import org.apache.ignite.configuration.*; + +/** + * Configuration for the streaming cache to store the stream of random numbers. + * This cache is configured with sliding window of 1 second, which means that + * data older than 1 second will be automatically removed from the cache. + */ +public class CacheConfig { + /** Cache name. */ + public static final String STREAM_NAME = "marketTicks"; + + /** + * Configure streaming cache. + */ + public static CacheConfiguration<String, MarketTick> marketTicksCache() { + return new CacheConfiguration<>("marketTicks"); + } + + public static CacheConfiguration<String, Instrument> instrumentCache() { + CacheConfiguration<String, Instrument> instCache = new CacheConfiguration<>("instCache"); + + instCache.setIndexedTypes(String.class, Instrument.class); + + return instCache; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java index 3855d19..62b3721 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java @@ -17,24 +17,31 @@ package org.apache.ignite.examples.java8.streaming.marketdata; +import org.apache.ignite.cache.query.annotations.*; + +import java.io.*; + /** - * + * Financial instrument. */ -public class Instrument { +public class Instrument implements Serializable { /** Instrument symbol. */ + @QuerySqlField(index = true) private final String symbol; /** Open price. */ - private volatile double open; + @QuerySqlField(index = true) + private double open; /** High price. */ - private volatile double high; + private double high; /** Low price. */ - private volatile double low = Long.MAX_VALUE; + private double low = Long.MAX_VALUE; /** Close price. */ - private volatile double close; + @QuerySqlField(index = true) + private double latest; /** * @param symbol Symbol. @@ -44,45 +51,17 @@ public class Instrument { } /** - * @return Copy of this instance. - */ - public synchronized Instrument copy() { - Instrument res = new Instrument(symbol); - - res.open = open; - res.high = high; - res.low = low; - res.close = close; - - return res; - } - - /** * Updates this bar with last price. * - * @param price Price. - */ - public synchronized void update(double price) { - if (open == 0) - open = price; - - high = Math.max(high, price); - low = Math.min(low, price); - close = price; - } - - /** - * Updates this bar with next bar. - * - * @param instrument Next bar. + * @param latest Price. */ - public synchronized void update(Instrument instrument) { + public void update(double latest) { if (open == 0) - open = instrument.open; + open = latest; - high = Math.max(high, instrument.high); - low = Math.min(low, instrument.low); - close = instrument.close; + high = Math.max(high, latest); + low = Math.min(low, latest); + this.latest = latest; } /** @@ -116,13 +95,12 @@ public class Instrument { /** * @return Close price. */ - public double close() { - return close; + public double latest() { + return latest; } /** {@inheritDoc} */ @Override public synchronized String toString() { - return "Bar [symbol=" + symbol + ", open=" + open + ", high=" + high + ", low=" + low + - ", close=" + close + ']'; + return "Instrument [symbol=" + symbol + ", latest=" + latest + ", change=" + (latest - open) + ']'; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java new file mode 100644 index 0000000..649ff66 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.streaming.marketdata; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.examples.java8.*; + +import java.util.*; + +/** + * Periodically query popular numbers from the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamMarketData}.</li> + * <li>Start querying top performing instruments using {@link QueryTopInstruments}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class QueryTopInstruments { + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache()); + + // Select top 3 instruments. + SqlFieldsQuery top3qry = new SqlFieldsQuery( + "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3"); + + // Query top 3 best performing instruments every 5 seconds. + while (true) { + // Execute queries. + List<List<?>> top3 = instCache.query(top3qry).getAll(); + + // Print average count. + List<?> row = top3.get(0); + + if (row.get(0) != null) + System.out.println("Top Performing Instruments:"); + + // Print top 10 words. + ExamplesUtils.printQueryResults(top3); + + Thread.sleep(5000); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java new file mode 100644 index 0000000..c2c1c2a --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.streaming.marketdata; + +import org.apache.ignite.*; +import org.apache.ignite.examples.java8.*; +import org.apache.ignite.stream.*; + +import java.util.*; + +/** + * Stream random numbers into the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamMarketData}.</li> + * <li>Start querying top performing instruments using {@link QueryTopInstruments}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class StreamMarketData { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Count of total numbers to generate. */ + private static final int CNT = 10000000; + + /** The list of instruments. */ + private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"}; + + /** The list of initial instrument prices. */ + private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50}; + + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache<String, MarketTick> mktCache = ignite.getOrCreateCache(CacheConfig.marketTicksCache()); + IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache()); + + try (IgniteDataStreamer<String, MarketTick> stmr = ignite.dataStreamer(mktCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Note that we receive market data, but do not populate 'mktCache' (it remains empty). + // Instead we update the instruments in the 'instCache'. + stmr.receiver(new StreamVisitor<>((cache, e) -> { + String symbol = e.getKey(); + MarketTick tick = e.getValue(); + + Instrument inst = instCache.get(symbol); + + if (inst == null) { + Instrument old = instCache.getAndPutIfAbsent(symbol, inst = new Instrument(symbol)); + + if (old != null) + inst = old; + } + + inst.update(tick.price()); + + instCache.put(symbol, inst); + })); + + // Stream market data into market data stream cache. + while (true) { + for (int j = 0; j < INSTRUMENTS.length; j++) { + // Use gaussian distribution to ensure that + // numbers closer to 0 have higher probability. + double price = round2(INITIAL_PRICES[j] + RAND.nextGaussian()); + + MarketTick tick = new MarketTick(INSTRUMENTS[j], price); + + stmr.addData(tick.symbol(), tick); + } + } + } + } + } + + /** + * Rounds double value to two significant signs. + * + * @param val value to be rounded. + * @return rounded double value. + */ + private static double round2(double val) { + return Math.floor(100 * val + 0.5) / 100; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java index 40163c8..f862553 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java @@ -32,11 +32,7 @@ import java.util.*; * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li> * </ul> * <p> - * Remote nodes should always be started with special configuration file which - * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. - * <p> - * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will - * start node with {@code examples/config/example-compute.xml} configuration. + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. */ public class QueryPopularNumbers { public static void main(String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java index bfc8f7b..c4e87d6 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java @@ -32,11 +32,7 @@ import java.util.*; * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li> * </ul> * <p> - * Remote nodes should always be started with special configuration file which - * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. - * <p> - * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will - * start node with {@code examples/config/example-compute.xml} configuration. + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. */ public class StreamRandomNumbers { /** Random number generator. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java index 8e81124..4dcb2b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java @@ -229,9 +229,9 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { /** * Sets custom stream receiver to this data streamer. * - * @param updater Stream receiver. + * @param rcvr Stream receiver. */ - public void receiver(StreamReceiver<K, V> updater); + public void receiver(StreamReceiver<K, V> rcvr); /** * Adds key for removal on remote node. Equivalent to {@link #addData(Object, Object) addData(key, null)}. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 12f7040..2233cfa 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -331,6 +331,11 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /* No-op. */ } + /** Cache name. */ + public CacheConfiguration(String name) { + this.name = name; + } + /** * Copy constructor. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java new file mode 100644 index 0000000..0474278 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Convenience adapter to visit every key-value tuple in the stream. Note, that the visitor + * does not update the cache. If the tuple needs to be stored in the cache, + * then {@code cache.put(...)} should be called explicitely. + */ +public class StreamVisitor<K, V> implements StreamReceiver<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Tuple visitor. */ + private IgniteBiInClosure<IgniteCache<K, V>, Map.Entry<K, V>> vis; + + /** + * Visitor to visit every stream key-value tuple. Note, that the visitor + * does not update the cache. If the tuple needs to be stored in the cache, + * then {@code cache.put(...)} should be called explicitely. + * + * @param vis Stream key-value tuple visitor. + */ + public StreamVisitor(IgniteBiInClosure<IgniteCache<K, V>, Map.Entry<K, V>> vis) { + this.vis = vis; + } + + /** {@inheritDoc} */ + @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException { + for (Map.Entry<K, V> entry : entries) + vis.apply(cache, entry); + } +}