Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-45 51f75909b -> b97bc6690


# ignite-45 - copied streaming to java7.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b97bc669
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b97bc669
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b97bc669

Branch: refs/heads/ignite-45
Commit: b97bc6690f60cf8d723c2c656ca21dc3f247965c
Parents: 51f7590
Author: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Authored: Sat Mar 21 11:42:32 2015 -0700
Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Committed: Sat Mar 21 11:42:32 2015 -0700

----------------------------------------------------------------------
 .../java7/streaming/marketdata/CacheConfig.java |  45 ++++++++
 .../java7/streaming/marketdata/Instrument.java  | 106 +++++++++++++++++++
 .../java7/streaming/marketdata/MarketTick.java  |  59 +++++++++++
 .../marketdata/QueryTopInstruments.java         |  73 +++++++++++++
 .../streaming/marketdata/StreamMarketData.java  | 105 ++++++++++++++++++
 .../java7/streaming/numbers/CacheConfig.java    |  46 ++++++++
 .../streaming/numbers/QueryPopularNumbers.java  |  74 +++++++++++++
 .../streaming/numbers/StreamRandomNumbers.java  |  79 ++++++++++++++
 .../java8/streaming/marketdata/Instrument.java  |  14 +--
 .../marketdata/QueryTopInstruments.java         |   9 +-
 .../streaming/marketdata/StreamMarketData.java  |  11 +-
 11 files changed, 604 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/CacheConfig.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/CacheConfig.java
 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/CacheConfig.java
new file mode 100644
index 0000000..9b1c096
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/CacheConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java7.streaming.marketdata;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ * Configuration for the streaming cache to store the stream of random numbers.
+ * This cache is configured with sliding window of 1 second, which means that
+ * data older than 1 second will be automatically removed from the cache.
+ */
+public class CacheConfig {
+    /** Cache name. */
+    public static final String STREAM_NAME = "marketTicks";
+
+    /**
+     * Configure streaming cache.
+     */
+    public static CacheConfiguration<String, MarketTick> marketTicksCache() {
+        return new CacheConfiguration<>("marketTicks");
+    }
+
+    public static CacheConfiguration<String, Instrument> instrumentCache() {
+        CacheConfiguration<String, Instrument> instCache = new 
CacheConfiguration<>("instCache");
+
+        instCache.setIndexedTypes(String.class, Instrument.class);
+
+        return instCache;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/Instrument.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/Instrument.java
 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/Instrument.java
new file mode 100644
index 0000000..e51e4b9
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/Instrument.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java7.streaming.marketdata;
+
+import org.apache.ignite.cache.query.annotations.*;
+
+import java.io.*;
+
+/**
+ * Financial instrument.
+ */
+public class Instrument implements Serializable {
+    /** Instrument symbol. */
+    @QuerySqlField(index = true)
+    private final String symbol;
+
+    /** Open price. */
+    @QuerySqlField(index = true)
+    private double open;
+
+    /** High price. */
+    private double high;
+
+    /** Low price. */
+    private double low = Long.MAX_VALUE;
+
+    /** Close price. */
+    @QuerySqlField(index = true)
+    private double latest;
+
+    /**
+     * @param symbol Symbol.
+     */
+    Instrument(String symbol) {
+        this.symbol = symbol;
+    }
+
+    /**
+     * Updates this instrument based on the latest market tick.
+     *
+     * @param tick Market tick.
+     */
+    public void update(MarketTick tick) {
+        if (open == 0)
+            open = tick.price();
+
+        high = Math.max(high, tick.price());
+        low = Math.min(low, tick.price());
+        this.latest = tick.price();
+    }
+
+    /**
+     * @return Symbol.
+     */
+    public String symbol() {
+        return symbol;
+    }
+
+    /**
+     * @return Open price.
+     */
+    public double open() {
+        return open;
+    }
+
+    /**
+     * @return High price.
+     */
+    public double high() {
+        return high;
+    }
+
+    /**
+     * @return Low price.
+     */
+    public double low() {
+        return low;
+    }
+
+    /**
+     * @return Close price.
+     */
+    public double latest() {
+        return latest;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized String toString() {
+        return "Instrument [symbol=" + symbol + ", latest=" + latest + ", 
change=" + (latest - open) + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/MarketTick.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/MarketTick.java
 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/MarketTick.java
new file mode 100644
index 0000000..f38c061
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/MarketTick.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java7.streaming.marketdata;
+
+import java.io.*;
+
+/**
+ * Represents a market tick data.
+ */
+public class MarketTick implements Serializable {
+    /** Instrument symbol. */
+    private final String symbol;
+
+    /** Price. */
+    private final double price;
+
+    /**
+     * @param symbol Symbol.
+     * @param price Price.
+     */
+    MarketTick(String symbol, double price) {
+        this.symbol = symbol;
+        this.price = price;
+    }
+
+    /**
+     * @return Symbol.
+     */
+    public String symbol() {
+        return symbol;
+    }
+
+    /**
+     * @return Price.
+     */
+    public double price() {
+        return price;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "MarketTick [symbol=" + symbol + ", price=" + price + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/QueryTopInstruments.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/QueryTopInstruments.java
 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/QueryTopInstruments.java
new file mode 100644
index 0000000..87a1173
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/QueryTopInstruments.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java7.streaming.marketdata;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.examples.java8.*;
+
+import java.util.*;
+
+/**
+ * Periodically query popular numbers from the streaming cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link StreamMarketData}.</li>
+ *     <li>Start querying top performing instruments using {@link 
QueryTopInstruments}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class QueryTopInstruments {
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            IgniteCache<String, Instrument> instCache = 
ignite.getOrCreateCache(CacheConfig.instrumentCache());
+
+            // Select top 3 instruments.
+            SqlFieldsQuery top3qry = new SqlFieldsQuery(
+                "select symbol, (latest - open) from Instrument order by 
(latest - open) desc limit 3");
+
+            // Select total profit.
+            SqlFieldsQuery profitQry = new SqlFieldsQuery("select sum(latest - 
open) from Instrument");
+
+            // Query top 3 best performing instruments every 5 seconds.
+            while (true) {
+                // Execute queries.
+                List<List<?>> top3 = instCache.query(top3qry).getAll();
+                List<List<?>> profit = instCache.query(profitQry).getAll();
+
+                List<?> row = profit.get(0);
+
+                if (row.get(0) != null)
+                    System.out.printf("Total profit: %.2f%n", row.get(0));
+
+                // Print top 10 words.
+                ExamplesUtils.printQueryResults(top3);
+
+                Thread.sleep(5000);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
new file mode 100644
index 0000000..469f510
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java7.streaming.marketdata;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.java8.*;
+import org.apache.ignite.stream.*;
+
+import java.util.*;
+
+/**
+ * Stream random numbers into the streaming cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link StreamMarketData}.</li>
+ *     <li>Start querying top performing instruments using {@link 
QueryTopInstruments}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class StreamMarketData {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Count of total numbers to generate. */
+    private static final int CNT = 10000000;
+
+    /** The list of instruments. */
+    private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", 
"EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"};
+
+    /** The list of initial instrument prices. */
+    private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 
23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50};
+
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // The cache is configured with sliding window holding 1 second of 
the streaming data.
+            IgniteCache<String, MarketTick> mktCache = 
ignite.getOrCreateCache(CacheConfig.marketTicksCache());
+            IgniteCache<String, Instrument> instCache = 
ignite.getOrCreateCache(CacheConfig.instrumentCache());
+
+            try (IgniteDataStreamer<String, MarketTick> stmr = 
ignite.dataStreamer(mktCache.getName())) {
+                // Note that we receive market data, but do not populate 
'mktCache' (it remains empty).
+                // Instead we update the instruments in the 'instCache'.
+                stmr.receiver(new StreamVisitor<>((cache, e) -> {
+                    String symbol = e.getKey();
+                    MarketTick tick = e.getValue();
+
+                    Instrument inst = instCache.get(symbol);
+
+                    if (inst == null)
+                        inst = new Instrument(symbol);
+
+                    // Update cached instrument based on the latest market 
tick.
+                    inst.update(tick);
+
+                    instCache.put(symbol, inst);
+                }));
+
+                // Stream market data into market data stream cache.
+                while (true) {
+                    for (int j = 0; j < INSTRUMENTS.length; j++) {
+                        // Use gaussian distribution to ensure that
+                        // numbers closer to 0 have higher probability.
+                        double price = round2(INITIAL_PRICES[j] + 
RAND.nextGaussian());
+
+                        MarketTick tick = new MarketTick(INSTRUMENTS[j], 
price);
+
+                        stmr.addData(tick.symbol(), tick);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Rounds double value to two significant signs.
+     *
+     * @param val value to be rounded.
+     * @return rounded double value.
+     */
+    private static double round2(double val) {
+        return Math.floor(100 * val + 0.5) / 100;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/CacheConfig.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/CacheConfig.java
 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/CacheConfig.java
new file mode 100644
index 0000000..643cc9c
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/CacheConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java7.streaming.numbers;
+
+import org.apache.ignite.configuration.*;
+
+import javax.cache.configuration.*;
+import javax.cache.expiry.*;
+
+import static java.util.concurrent.TimeUnit.*;
+
+/**
+ * Configuration for the streaming cache to store the stream of random numbers.
+ * This cache is configured with sliding window of 1 second, which means that
+ * data older than 1 second will be automatically removed from the cache.
+ */
+public class CacheConfig {
+    /**
+     * Configure streaming cache.
+     */
+    public static CacheConfiguration<Integer, Long> randomNumbersCache() {
+        CacheConfiguration<Integer, Long> cfg = new 
CacheConfiguration<>("randomNumbers");
+
+        cfg.setIndexedTypes(Integer.class, Long.class);
+
+        // Sliding window of 1 seconds.
+        cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new 
CreatedExpiryPolicy(new Duration(SECONDS, 1))));
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/QueryPopularNumbers.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/QueryPopularNumbers.java
 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/QueryPopularNumbers.java
new file mode 100644
index 0000000..399f916
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/QueryPopularNumbers.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java7.streaming.numbers;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.examples.java8.*;
+
+import java.util.*;
+
+/**
+ * Periodically query popular numbers from the streaming cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link StreamRandomNumbers}.</li>
+ *     <li>Start querying popular numbers using {@link 
QueryPopularNumbers}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class QueryPopularNumbers {
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // The cache is configured with sliding window holding 1 second of 
the streaming data.
+            IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
+
+            // Select top 10 words.
+            SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val 
from Long order by _val desc limit 10");
+
+            // Select average, min, and max counts among all the words.
+            SqlFieldsQuery statsQry = new SqlFieldsQuery("select avg(_val), 
min(_val), max(_val) from Long");
+
+            // Query top 10 popular numbers every 5 seconds.
+            while (true) {
+                // Execute queries.
+                List<List<?>> top10 = stmCache.query(top10Qry).getAll();
+                List<List<?>> stats = stmCache.query(statsQry).getAll();
+
+                // Print average count.
+                List<?> row = stats.get(0);
+
+                if (row.get(0) != null)
+                    System.out.printf("Query results [avg=%.2f, min=%d, 
max=%d]%n", row.get(0), row.get(1), row.get(2));
+
+                // Print top 10 words.
+                ExamplesUtils.printQueryResults(top10);
+
+                Thread.sleep(5000);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/StreamRandomNumbers.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/StreamRandomNumbers.java
 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/StreamRandomNumbers.java
new file mode 100644
index 0000000..bbac4d4
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/numbers/StreamRandomNumbers.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java7.streaming.numbers;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.examples.java8.*;
+import org.apache.ignite.stream.*;
+
+import javax.cache.processor.*;
+import java.util.*;
+
+/**
+ * Stream random numbers into the streaming cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link StreamRandomNumbers}.</li>
+ *     <li>Start querying popular numbers using {@link 
QueryPopularNumbers}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class StreamRandomNumbers {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Range within which to generate numbers. */
+    private static final int RANGE = 1000;
+
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // The cache is configured with sliding window holding 1 second of 
the streaming data.
+            IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
+
+            try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(stmCache.getName())) {
+                // Allow data updates.
+                stmr.allowOverwrite(true);
+
+                // Configure data transformation to count instances of the 
same word.
+                stmr.receiver(new StreamTransformer<>(new 
CacheEntryProcessor<Integer, Long, Object>() {
+                    @Override
+                    public Object process(MutableEntry<Integer, Long> e, 
Object... arg) {
+                        Long val = e.getValue();
+
+                        e.setValue(val == null ? 1L : val + 1);
+
+                        return null;
+                    }
+                }));
+
+                // Stream random numbers into the streamer cache.
+                while (true)
+                    stmr.addData(RAND.nextInt(RANGE), 1L);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
index 62b3721..96a880a 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
@@ -51,17 +51,17 @@ public class Instrument implements Serializable {
     }
 
     /**
-     * Updates this bar with last price.
+     * Updates this instrument based on the latest market tick.
      *
-     * @param latest Price.
+     * @param tick Market tick.
      */
-    public void update(double latest) {
+    public void update(MarketTick tick) {
         if (open == 0)
-            open = latest;
+            open = tick.price();
 
-        high = Math.max(high, latest);
-        low = Math.min(low, latest);
-        this.latest = latest;
+        high = Math.max(high, tick.price());
+        low = Math.min(low, tick.price());
+        this.latest = tick.price();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java
index 649ff66..33feb6a 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java
@@ -49,16 +49,19 @@ public class QueryTopInstruments {
             SqlFieldsQuery top3qry = new SqlFieldsQuery(
                 "select symbol, (latest - open) from Instrument order by 
(latest - open) desc limit 3");
 
+            // Select total profit.
+            SqlFieldsQuery profitQry = new SqlFieldsQuery("select sum(latest - 
open) from Instrument");
+
             // Query top 3 best performing instruments every 5 seconds.
             while (true) {
                 // Execute queries.
                 List<List<?>> top3 = instCache.query(top3qry).getAll();
+                List<List<?>> profit = instCache.query(profitQry).getAll();
 
-                // Print average count.
-                List<?> row = top3.get(0);
+                List<?> row = profit.get(0);
 
                 if (row.get(0) != null)
-                    System.out.println("Top Performing Instruments:");
+                    System.out.printf("Total profit: %.2f%n", row.get(0));
 
                 // Print top 10 words.
                 ExamplesUtils.printQueryResults(top3);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97bc669/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
index 10c9caa..104ed17 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
@@ -68,14 +68,11 @@ public class StreamMarketData {
 
                     Instrument inst = instCache.get(symbol);
 
-                    if (inst == null) {
-                        Instrument old = instCache.getAndPutIfAbsent(symbol, 
inst = new Instrument(symbol));
+                    if (inst == null)
+                        inst = new Instrument(symbol);
 
-                        if (old != null)
-                            inst = old;
-                    }
-
-                    inst.update(tick.price());
+                    // Update cached instrument based on the latest market 
tick.
+                    inst.update(tick);
 
                     instCache.put(symbol, inst);
                 }));

Reply via email to