# ignite-45 - fixing streaming.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8ec30798 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8ec30798 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8ec30798 Branch: refs/heads/ignite-45 Commit: 8ec30798cc26dd8d002a3704bf0e4a1a5ff8e7f2 Parents: 115c712 Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Fri Mar 20 21:33:13 2015 -0700 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Fri Mar 20 21:33:13 2015 -0700 ---------------------------------------------------------------------- .../datastructures/IgniteSetExample.java | 1 - .../ignite/examples/java8/ExamplesUtils.java | 4 +- .../java8/streaming/marketdata/Instrument.java | 128 +++++++++++++++++++ .../java8/streaming/marketdata/MarketTick.java | 59 +++++++++ .../java8/streaming/numbers/CacheConfig.java | 2 - .../streaming/numbers/QueryPopularNumbers.java | 36 ++++-- .../streaming/numbers/StreamRandomNumbers.java | 32 ++--- .../java/org/apache/ignite/IgniteCache.java | 2 +- 8 files changed, 228 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSetExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSetExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSetExample.java index 1fd936b..cfd7d45 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSetExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSetExample.java @@ -70,7 +70,6 @@ public class IgniteSetExample { clearAndRemoveSet(); } - } System.out.println("Ignite set example finished."); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java index 7b36fd9..5b431bc 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java @@ -104,11 +104,9 @@ public class ExamplesUtils { if (res == null || res.isEmpty()) System.out.println("Query result set is empty."); else { - System.out.println("Query results:"); - for (Object row : res) { if (row instanceof List) { - System.out.print(" ("); + System.out.print("("); List<?> l = (List)row; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java new file mode 100644 index 0000000..3855d19 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.streaming.marketdata; + +/** + * + */ +public class Instrument { + /** Instrument symbol. */ + private final String symbol; + + /** Open price. */ + private volatile double open; + + /** High price. */ + private volatile double high; + + /** Low price. */ + private volatile double low = Long.MAX_VALUE; + + /** Close price. */ + private volatile double close; + + /** + * @param symbol Symbol. + */ + Instrument(String symbol) { + this.symbol = symbol; + } + + /** + * @return Copy of this instance. + */ + public synchronized Instrument copy() { + Instrument res = new Instrument(symbol); + + res.open = open; + res.high = high; + res.low = low; + res.close = close; + + return res; + } + + /** + * Updates this bar with last price. + * + * @param price Price. + */ + public synchronized void update(double price) { + if (open == 0) + open = price; + + high = Math.max(high, price); + low = Math.min(low, price); + close = price; + } + + /** + * Updates this bar with next bar. + * + * @param instrument Next bar. + */ + public synchronized void update(Instrument instrument) { + if (open == 0) + open = instrument.open; + + high = Math.max(high, instrument.high); + low = Math.min(low, instrument.low); + close = instrument.close; + } + + /** + * @return Symbol. + */ + public String symbol() { + return symbol; + } + + /** + * @return Open price. + */ + public double open() { + return open; + } + + /** + * @return High price. + */ + public double high() { + return high; + } + + /** + * @return Low price. + */ + public double low() { + return low; + } + + /** + * @return Close price. + */ + public double close() { + return close; + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return "Bar [symbol=" + symbol + ", open=" + open + ", high=" + high + ", low=" + low + + ", close=" + close + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/MarketTick.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/MarketTick.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/MarketTick.java new file mode 100644 index 0000000..7a72189 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/MarketTick.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.streaming.marketdata; + +import java.io.*; + +/** + * Represents a market tick data. + */ +public class MarketTick implements Serializable { + /** Instrument symbol. */ + private final String symbol; + + /** Price. */ + private final double price; + + /** + * @param symbol Symbol. + * @param price Price. + */ + MarketTick(String symbol, double price) { + this.symbol = symbol; + this.price = price; + } + + /** + * @return Symbol. + */ + public String symbol() { + return symbol; + } + + /** + * @return Price. + */ + public double price() { + return price; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "MarketTick [symbol=" + symbol + ", price=" + price + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java index ada7932..bd6513a 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java @@ -17,7 +17,6 @@ package org.apache.ignite.examples.java8.streaming.numbers; -import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import javax.cache.configuration.*; @@ -40,7 +39,6 @@ public class CacheConfig { public static CacheConfiguration<Integer, Long> configure() { CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(); - cfg.setCacheMode(CacheMode.PARTITIONED); cfg.setName(STREAM_NAME); cfg.setIndexedTypes(Integer.class, Long.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java index 173e153..40163c8 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java @@ -44,24 +44,34 @@ public class QueryPopularNumbers { Ignition.setClientMode(true); try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + // The cache is configured with sliding window holding 1 second of the streaming data. - try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.configure())) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.configure()); + + // Select top 10 words. + SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10"); + + // Select average, min, and max counts among all the words. + SqlFieldsQuery statsQry = new SqlFieldsQuery("select avg(_val), min(_val), max(_val) from Long"); + + // Query top 10 popular numbers every 5 seconds. + while (true) { + // Execute queries. + List<List<?>> top10 = stmCache.query(top10Qry).getAll(); + List<List<?>> stats = stmCache.query(statsQry).getAll(); - // Query top 10 popular numbers every 5 seconds. - while (true) { - // Select top 10 words. - SqlFieldsQuery top10 = new SqlFieldsQuery( - "select _key, _val from Long order by _val desc limit 10"); + // Print average count. + List<?> row = stats.get(0); - // Execute query. - List<List<?>> results = stmCache.queryFields(top10).getAll(); + if (row.get(0) != null) + System.out.printf("Query results [avg=%.2f, min=%d, max=%d]%n", row.get(0), row.get(1), row.get(2)); - ExamplesUtils.printQueryResults(results); + // Print top 10 words. + ExamplesUtils.printQueryResults(top10); - Thread.sleep(5000); - } + Thread.sleep(5000); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java index cfa006d..bfc8f7b 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java @@ -50,28 +50,28 @@ public class StreamRandomNumbers { Ignition.setClientMode(true); try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + // The cache is configured with sliding window holding 1 second of the streaming data. - try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.configure())) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.configure()); - try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { - // Allow data updates. - stmr.allowOverwrite(true); + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); - // Configure data transformation to count instances of the same word. - stmr.receiver(new StreamTransformer<>((e, arg) -> { - Long val = e.getValue(); + // Configure data transformation to count instances of the same word. + stmr.receiver(new StreamTransformer<>((e, arg) -> { + Long val = e.getValue(); - e.setValue(val == null ? 1L : val + 1); + e.setValue(val == null ? 1L : val + 1); - return null; - })); + return null; + })); - // Stream random numbers into the streamer cache. - while (true) - stmr.addData(RAND.nextInt(RANGE), 1L); - } + // Stream random numbers into the streamer cache. + while (true) + stmr.addData(RAND.nextInt(RANGE), 1L); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 5977623..f54597d 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -196,7 +196,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public boolean isLocalLocked(K key, boolean byCurrThread); /** - * Queries cache. Accepts any subclass of {@link Query}. + * Queries cache. Accepts any subclass of {@link Query} interface. * * @param qry Query. * @return Cursor.