# sprint-3 - Updated streaming examples.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b2984c9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b2984c9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b2984c9b

Branch: refs/heads/ignite-sprint-3
Commit: b2984c9bf4a2563cfd943def1c94d68e9b75aef6
Parents: 5a2b46c
Author: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Authored: Mon Apr 6 23:15:55 2015 -0700
Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Committed: Mon Apr 6 23:15:55 2015 -0700

----------------------------------------------------------------------
 examples/README.txt                             |   11 +-
 examples/config/example-ignite.xml              |    2 +-
 .../streaming/marketdata/CacheConfig.java       |   44 -
 .../streaming/marketdata/Instrument.java        |  106 -
 .../marketdata/QueryTopInstruments.java         |   73 -
 .../streaming/marketdata/StreamMarketData.java  |  103 -
 .../examples/streaming/numbers/CacheConfig.java |   46 -
 .../streaming/numbers/QueryPopularNumbers.java  |   74 -
 .../streaming/numbers/StreamRandomNumbers.java  |   78 -
 .../transformers/StreamTransformerExample.java  |  101 +
 .../visitors/StreamVisitorExample.java          |  176 +
 .../streaming/wordcount/CacheConfig.java        |   48 +
 .../streaming/wordcount/QueryWords.java         |   77 +
 .../streaming/wordcount/StreamWords.java        |   68 +
 .../streaming/wordcount/alice-in-wonderland.txt | 3735 ++++++++++++++++++
 .../java8/streaming/marketdata/CacheConfig.java |   44 -
 .../java8/streaming/marketdata/Instrument.java  |  106 -
 .../marketdata/QueryTopInstruments.java         |   73 -
 .../streaming/marketdata/StreamMarketData.java  |  101 -
 .../java8/streaming/numbers/CacheConfig.java    |   46 -
 .../streaming/numbers/QueryPopularNumbers.java  |   74 -
 .../streaming/numbers/StreamRandomNumbers.java  |   74 -
 .../transformers/StreamTransformerExample.java  |   97 +
 .../visitors/StreamVisitorExample.java          |  173 +
 .../ignite/cache/affinity/AffinityUuid.java     |   47 +
 25 files changed, 4532 insertions(+), 1045 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/README.txt
----------------------------------------------------------------------
diff --git a/examples/README.txt b/examples/README.txt
index ce6b238..c75cbb3 100644
--- a/examples/README.txt
+++ b/examples/README.txt
@@ -1,5 +1,5 @@
 Apache Ignite Examples
-----------------------
+======================
 
 This folder contains code examples for various Apache Ignite functionality.
 
@@ -18,8 +18,15 @@ The examples folder contains he following subfolders:
   are excluded by default, enable `java8-examples` Maven profile to include 
them (JDK8 is required).
 - `src/main/scala` - contains examples demonstrating usage of API provided by 
Scalar.
 
+
 Starting Remote Nodes
----------------------
+=====================
 
 Remote nodes for examples should always be started with special configuration 
file which enables P2P
 class loading: `examples/config/example-ignite.xml`. To run a remote node in 
IDE use `ExampleNodeStartup` class.
+
+
+Javay7 vs Java8
+===============
+Some examples (not all) which can benefit from Java8 Lambda support were 
re-written with Java8 lambdas.
+For full set of examples, look at both Java7 and Java8 packages.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/config/example-ignite.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-ignite.xml 
b/examples/config/example-ignite.xml
index e7adb54..e746e59 100644
--- a/examples/config/example-ignite.xml
+++ b/examples/config/example-ignite.xml
@@ -68,7 +68,7 @@
                     -->
                     <!-- Uncomment static IP finder to enable static-based 
discovery of initial nodes. -->
                     <!--<bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
-                    <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+                    <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                         <property name="addresses">
                             <list>
                                 <!-- In distributed environment, replace with 
actual host IP address. -->

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/CacheConfig.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/CacheConfig.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/CacheConfig.java
deleted file mode 100644
index f26ffb7..0000000
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/CacheConfig.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.marketdata;
-
-import org.apache.ignite.configuration.*;
-
-/**
- * Configuration for the streaming caches for market data and financial 
instruments.
- */
-public class CacheConfig {
-    /**
-     * Configure streaming cache for market ticks.
-     */
-    public static CacheConfiguration<String, Double> marketTicksCache() {
-        return new CacheConfiguration<>("marketTicks");
-    }
-
-    /**
-     * Configure cache for financial instruments.
-     */
-    public static CacheConfiguration<String, Instrument> instrumentCache() {
-        CacheConfiguration<String, Instrument> instCache = new 
CacheConfiguration<>("instCache");
-
-        // Index some fields for querying portfolio positions.
-        instCache.setIndexedTypes(String.class, Instrument.class);
-
-        return instCache;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/Instrument.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/Instrument.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/Instrument.java
deleted file mode 100644
index 6daaffe..0000000
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/Instrument.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.marketdata;
-
-import org.apache.ignite.cache.query.annotations.*;
-
-import java.io.*;
-
-/**
- * Financial instrument.
- */
-public class Instrument implements Serializable {
-    /** Instrument symbol. */
-    @QuerySqlField(index = true)
-    private final String symbol;
-
-    /** Open price. */
-    @QuerySqlField(index = true)
-    private double open;
-
-    /** High price. */
-    private double high;
-
-    /** Low price. */
-    private double low = Long.MAX_VALUE;
-
-    /** Close price. */
-    @QuerySqlField(index = true)
-    private double latest;
-
-    /**
-     * @param symbol Symbol.
-     */
-    Instrument(String symbol) {
-        this.symbol = symbol;
-    }
-
-    /**
-     * Updates this instrument based on the latest price.
-     *
-     * @param price Latest price.
-     */
-    public void update(double price) {
-        if (open == 0)
-            open = price;
-
-        high = Math.max(high, price);
-        low = Math.min(low, price);
-        this.latest = price;
-    }
-
-    /**
-     * @return Symbol.
-     */
-    public String symbol() {
-        return symbol;
-    }
-
-    /**
-     * @return Open price.
-     */
-    public double open() {
-        return open;
-    }
-
-    /**
-     * @return High price.
-     */
-    public double high() {
-        return high;
-    }
-
-    /**
-     * @return Low price.
-     */
-    public double low() {
-        return low;
-    }
-
-    /**
-     * @return Close price.
-     */
-    public double latest() {
-        return latest;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized String toString() {
-        return "Instrument [symbol=" + symbol + ", latest=" + latest + ", 
change=" + (latest - open) + ']';
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/QueryTopInstruments.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/QueryTopInstruments.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/QueryTopInstruments.java
deleted file mode 100644
index 4bcb2ba..0000000
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/QueryTopInstruments.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.marketdata;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.examples.*;
-
-import java.util.*;
-
-/**
- * Periodically query popular numbers from the streaming cache.
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
- *     <li>Start streaming using {@link StreamMarketData}.</li>
- *     <li>Start querying top performing instruments using {@link 
QueryTopInstruments}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
- */
-public class QueryTopInstruments {
-    public static void main(String[] args) throws Exception {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
-            if (!ExamplesUtils.hasServerNodes(ignite))
-                return;
-
-            IgniteCache<String, Instrument> instCache = 
ignite.getOrCreateCache(CacheConfig.instrumentCache());
-
-            // Select top 3 instruments.
-            SqlFieldsQuery top3qry = new SqlFieldsQuery(
-                "select symbol, (latest - open) from Instrument order by 
(latest - open) desc limit 3");
-
-            // Select total profit.
-            SqlFieldsQuery profitQry = new SqlFieldsQuery("select sum(latest - 
open) from Instrument");
-
-            // Query top 3 best performing instruments every 5 seconds.
-            while (true) {
-                // Execute queries.
-                List<List<?>> top3 = instCache.query(top3qry).getAll();
-                List<List<?>> profit = instCache.query(profitQry).getAll();
-
-                List<?> row = profit.get(0);
-
-                if (row.get(0) != null)
-                    System.out.printf("Total profit: %.2f%n", row.get(0));
-
-                // Print top 10 words.
-                ExamplesUtils.printQueryResults(top3);
-
-                Thread.sleep(5000);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/StreamMarketData.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/StreamMarketData.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/StreamMarketData.java
deleted file mode 100644
index bfac698..0000000
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/marketdata/StreamMarketData.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.marketdata;
-
-import org.apache.ignite.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.stream.*;
-
-import java.util.*;
-
-/**
- * Stream random numbers into the streaming cache.
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
- *     <li>Start streaming using {@link StreamMarketData}.</li>
- *     <li>Start querying top performing instruments using {@link 
QueryTopInstruments}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
- */
-public class StreamMarketData {
-    /** Random number generator. */
-    private static final Random RAND = new Random();
-
-    /** The list of instruments. */
-    private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", 
"EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"};
-
-    /** The list of initial instrument prices. */
-    private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 
23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50};
-
-    public static void main(String[] args) throws Exception {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
-            if (!ExamplesUtils.hasServerNodes(ignite))
-                return;
-
-            // The cache is configured with sliding window holding 1 second of 
the streaming data.
-            IgniteCache<String, Double> mktCache = 
ignite.getOrCreateCache(CacheConfig.marketTicksCache());
-            final IgniteCache<String, Instrument> instCache = 
ignite.getOrCreateCache(CacheConfig.instrumentCache());
-
-            try (IgniteDataStreamer<String, Double> mktStmr = 
ignite.dataStreamer(mktCache.getName())) {
-                // Note that we receive market data, but do not populate 
'mktCache' (it remains empty).
-                // Instead we update the instruments in the 'instCache'.
-                mktStmr.receiver(new StreamVisitor<String, Double>() {
-                    @Override public void apply(IgniteCache<String, Double> 
cache, Map.Entry<String, Double> e) {
-                        String symbol = e.getKey();
-                        Double tick = e.getValue();
-
-                        Instrument inst = instCache.get(symbol);
-
-                        if (inst == null)
-                            inst = new Instrument(symbol);
-
-                        // Don't populate market cache, as we don't use it for 
querying.
-                        // Update cached instrument based on the latest market 
tick.
-                        inst.update(tick);
-
-                        instCache.put(symbol, inst);
-                    }
-                });
-
-                // Stream market data into market data stream cache.
-                while (true) {
-                    for (int j = 0; j < INSTRUMENTS.length; j++) {
-                        // Use gaussian distribution to ensure that
-                        // numbers closer to 0 have higher probability.
-                        double price = round2(INITIAL_PRICES[j] + 
RAND.nextGaussian());
-
-                        mktStmr.addData(INSTRUMENTS[j], price);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Rounds double value to two significant signs.
-     *
-     * @param val value to be rounded.
-     * @return rounded double value.
-     */
-    private static double round2(double val) {
-        return Math.floor(100 * val + 0.5) / 100;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/CacheConfig.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/CacheConfig.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/CacheConfig.java
deleted file mode 100644
index 58592e1..0000000
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/CacheConfig.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.numbers;
-
-import org.apache.ignite.configuration.*;
-
-import javax.cache.configuration.*;
-import javax.cache.expiry.*;
-
-import static java.util.concurrent.TimeUnit.*;
-
-/**
- * Configuration for the streaming cache to store the stream of random numbers.
- * This cache is configured with sliding window of 1 second, which means that
- * data older than 1 second will be automatically removed from the cache.
- */
-public class CacheConfig {
-    /**
-     * Configure streaming cache.
-     */
-    public static CacheConfiguration<Integer, Long> randomNumbersCache() {
-        CacheConfiguration<Integer, Long> cfg = new 
CacheConfiguration<>("randomNumbers");
-
-        cfg.setIndexedTypes(Integer.class, Long.class);
-
-        // Sliding window of 1 seconds.
-        cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new 
CreatedExpiryPolicy(new Duration(SECONDS, 1))));
-
-        return cfg;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/QueryPopularNumbers.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/QueryPopularNumbers.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/QueryPopularNumbers.java
deleted file mode 100644
index e08ae09..0000000
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/QueryPopularNumbers.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.numbers;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.examples.*;
-
-import java.util.*;
-
-/**
- * Periodically query popular numbers from the streaming cache.
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
- *     <li>Start streaming using {@link StreamRandomNumbers}.</li>
- *     <li>Start querying popular numbers using {@link 
QueryPopularNumbers}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
- */
-public class QueryPopularNumbers {
-    public static void main(String[] args) throws Exception {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
-            if (!ExamplesUtils.hasServerNodes(ignite))
-                return;
-
-            // The cache is configured with sliding window holding 1 second of 
the streaming data.
-            IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
-
-            // Select top 10 words.
-            SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val 
from Long order by _val desc limit 10");
-
-            // Select average, min, and max counts among all the words.
-            SqlFieldsQuery statsQry = new SqlFieldsQuery("select avg(_val), 
min(_val), max(_val) from Long");
-
-            // Query top 10 popular numbers every 5 seconds.
-            while (true) {
-                // Execute queries.
-                List<List<?>> top10 = stmCache.query(top10Qry).getAll();
-                List<List<?>> stats = stmCache.query(statsQry).getAll();
-
-                // Print average count.
-                List<?> row = stats.get(0);
-
-                if (row.get(0) != null)
-                    System.out.printf("Query results [avg=%.2f, min=%d, 
max=%d]%n", row.get(0), row.get(1), row.get(2));
-
-                // Print top 10 words.
-                ExamplesUtils.printQueryResults(top10);
-
-                Thread.sleep(5000);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java
deleted file mode 100644
index 02030fb..0000000
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.numbers;
-
-import org.apache.ignite.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.stream.*;
-
-import javax.cache.processor.*;
-import java.util.*;
-
-/**
- * Stream random numbers into the streaming cache.
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
- *     <li>Start streaming using {@link StreamRandomNumbers}.</li>
- *     <li>Start querying popular numbers using {@link 
QueryPopularNumbers}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
- */
-public class StreamRandomNumbers {
-    /** Random number generator. */
-    private static final Random RAND = new Random();
-
-    /** Range within which to generate numbers. */
-    private static final int RANGE = 1000;
-
-    public static void main(String[] args) throws Exception {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
-            if (!ExamplesUtils.hasServerNodes(ignite))
-                return;
-
-            // The cache is configured with sliding window holding 1 second of 
the streaming data.
-            IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
-
-            try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(stmCache.getName())) {
-                // Allow data updates.
-                stmr.allowOverwrite(true);
-
-                // Configure data transformation to count instances of the 
same number.
-                stmr.receiver(new StreamTransformer<Integer, Long>() {
-                    @Override public Object process(MutableEntry<Integer, 
Long> e, Object... objects)
-                        throws EntryProcessorException {
-                        Long val = e.getValue();
-
-                        e.setValue(val == null ? 1L : val + 1);
-
-                        return null;
-                    }
-                });
-
-                // Stream random numbers into the streamer cache.
-                while (true)
-                    stmr.addData(RAND.nextInt(RANGE), 1L);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/StreamTransformerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/StreamTransformerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/StreamTransformerExample.java
new file mode 100644
index 0000000..b2fc89d
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/StreamTransformerExample.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.transformers;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.stream.*;
+
+import javax.cache.processor.*;
+import java.util.*;
+
+/**
+ * Stream random numbers into the streaming cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link StreamTransformerExample}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class StreamTransformerExample {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Range within which to generate numbers. */
+    private static final int RANGE = 1000;
+
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            CacheConfiguration<Integer, Long> cfg = new 
CacheConfiguration<>("randomNumbers");
+
+            // Index key and value.
+            cfg.setIndexedTypes(Integer.class, Long.class);
+
+            // Auto-close cache at the end of the example.
+            try (IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(cfg)) {
+                try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(stmCache.getName())) {
+                    // Allow data updates.
+                    stmr.allowOverwrite(true);
+
+                    // Configure data transformation to count instances of the 
same number.
+                    stmr.receiver(new StreamTransformer<Integer, Long>() {
+                        @Override
+                        public Object process(MutableEntry<Integer, Long> e, 
Object... args) {
+                            // Get current count.
+                            Long val = e.getValue();
+
+                            // Increment count by 1.
+                            e.setValue(val == null ? 1L : val + 1);
+
+                            return null;
+                        }
+                    });
+
+                    // Stream 10 million of random numbers into the streamer 
cache.
+                    for (int i = 1; i <= 10_000_000; i++) {
+                        stmr.addData(RAND.nextInt(RANGE), 1L);
+
+                        if (i % 500_000 == 0)
+                            System.out.println("Number of tuples streamed into 
Ignite: " + i);
+                    }
+                }
+
+                // Query top 10 most popular numbers every.
+                SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, 
_val from Long order by _val desc limit 10");
+
+                // Execute queries.
+                List<List<?>> top10 = stmCache.query(top10Qry).getAll();
+
+                System.out.println("Top 10 most popular numbers:");
+
+                // Print top 10 words.
+                ExamplesUtils.printQueryResults(top10);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/StreamVisitorExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/StreamVisitorExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/StreamVisitorExample.java
new file mode 100644
index 0000000..cd973ea
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/StreamVisitorExample.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.visitors;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.stream.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Stream random numbers into the streaming cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link StreamVisitorExample}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class StreamVisitorExample {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** The list of instruments. */
+    private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", 
"EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"};
+
+    /** The list of initial instrument prices. */
+    private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 
23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50};
+
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // Market data cache with default configuration.
+            CacheConfiguration<String, Double> mktDataCfg = new 
CacheConfiguration<String, Double>("marketTicks");
+
+            // Financial instrument cache configuration.
+            CacheConfiguration<String, Instrument> instCfg = new 
CacheConfiguration<>("instCache");
+
+            // Index key and value for querying financial instruments.
+            // Note that Instrument class has @QuerySqlField annotation for 
secondary field indexing.
+            instCfg.setIndexedTypes(String.class, Instrument.class);
+
+            // Auto-close caches at the end of the example.
+            try (
+                IgniteCache<String, Double> mktCache = 
ignite.getOrCreateCache(mktDataCfg);
+                IgniteCache<String, Instrument> instCache = 
ignite.getOrCreateCache(instCfg)
+            ) {
+                try (IgniteDataStreamer<String, Double> mktStmr = 
ignite.dataStreamer(mktCache.getName())) {
+                    // Note that we receive market data, but do not populate 
'mktCache' (it remains empty).
+                    // Instead we update the instruments in the 'instCache'.
+                    // Since both, 'instCache' and 'mktCache' use the same 
key, updates are collocated.
+                    mktStmr.receiver(new StreamVisitor<String, Double>() {
+                        @Override
+                        public void apply(IgniteCache<String, Double> cache, 
Map.Entry<String, Double> e) {
+                            String symbol = e.getKey();
+                            Double tick = e.getValue();
+
+                            Instrument inst = instCache.get(symbol);
+
+                            if (inst == null)
+                                inst = new Instrument(symbol);
+
+                            // Don't populate market cache, as we don't use it 
for querying.
+                            // Update cached instrument based on the latest 
market tick.
+                            inst.update(tick);
+
+                            instCache.put(symbol, inst);
+                        }
+                    });
+
+                    // Stream 10 million market data ticks into the system.
+                    for (int i = 1; i <= 10_000_000; i++) {
+                        int idx = RAND.nextInt(INSTRUMENTS.length);
+
+                        // Use gaussian distribution to ensure that
+                        // numbers closer to 0 have higher probability.
+                        double price = round2(INITIAL_PRICES[idx] + 
RAND.nextGaussian());
+
+                        mktStmr.addData(INSTRUMENTS[idx], price);
+
+                        if (i % 500_000 == 0)
+                            System.out.println("Number of tuples streamed into 
Ignite: " + i);
+                    }
+                }
+
+                // Select top 3 best performing instruments.
+                SqlFieldsQuery top3qry = new SqlFieldsQuery(
+                    "select symbol, (latest - open) from Instrument order by 
(latest - open) desc limit 3");
+
+                // Execute queries.
+                List<List<?>> top3 = instCache.query(top3qry).getAll();
+
+                System.out.println("Top performing financial instruments: ");
+
+                // Print top 10 words.
+                ExamplesUtils.printQueryResults(top3);
+            }
+        }
+    }
+
+    /**
+     * Rounds double value to two significant signs.
+     *
+     * @param val value to be rounded.
+     * @return rounded double value.
+     */
+    private static double round2(double val) {
+        return Math.floor(100 * val + 0.5) / 100;
+    }
+
+    /**
+     * Financial instrument.
+     */
+    public static class Instrument implements Serializable {
+        /** Instrument symbol. */
+        @QuerySqlField(index = true)
+        private final String symbol;
+
+        /** Open price. */
+        @QuerySqlField(index = true)
+        private double open;
+
+        /** Close price. */
+        @QuerySqlField(index = true)
+        private double latest;
+
+        /**
+         * @param symbol Symbol.
+         */
+        Instrument(String symbol) {
+            this.symbol = symbol;
+        }
+
+        /**
+         * Updates this instrument based on the latest price.
+         *
+         * @param price Latest price.
+         */
+        public void update(double price) {
+            if (open == 0)
+                open = price;
+
+            this.latest = price;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized String toString() {
+            return "Instrument [symbol=" + symbol + ", latest=" + latest + ", 
change=" + (latest - open) + ']';
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
new file mode 100644
index 0000000..58704ca
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.wordcount;
+
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.configuration.*;
+
+import javax.cache.configuration.*;
+import javax.cache.expiry.*;
+
+import static java.util.concurrent.TimeUnit.*;
+
+/**
+ * Configuration for the streaming cache to store the stream of random numbers.
+ * This cache is configured with sliding window of 1 second, which means that
+ * data older than 1 second will be automatically removed from the cache.
+ */
+public class CacheConfig {
+    /**
+     * Configure streaming cache.
+     */
+    public static CacheConfiguration<AffinityUuid, String> wordCache() {
+        CacheConfiguration<AffinityUuid, String> cfg = new 
CacheConfiguration<>("words");
+
+        // Index all words streamed into cache.
+        cfg.setIndexedTypes(AffinityUuid.class, String.class);
+
+        // Sliding window of 1 seconds.
+        cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new 
CreatedExpiryPolicy(new Duration(SECONDS, 1))));
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
new file mode 100644
index 0000000..c32d8e8
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.wordcount;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.examples.*;
+
+import java.util.*;
+
+/**
+ * Periodically query popular numbers from the streaming cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link StreamWords}.</li>
+ *     <li>Start querying popular numbers using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class QueryWords {
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // The cache is configured with sliding window holding 1 second of 
the streaming data.
+            IgniteCache<AffinityUuid, String> stmCache = 
ignite.getOrCreateCache(CacheConfig.wordCache());
+
+            // Select top 10 words.
+            SqlFieldsQuery top10Qry = new SqlFieldsQuery(
+                "select _val, count(_val) as cnt from String group by _val 
order by cnt desc limit 10");
+
+            // Select average, min, and max counts among all the words.
+            SqlFieldsQuery statsQry = new SqlFieldsQuery(
+                "select avg(cnt), min(cnt), max(cnt) from (select count(_val) 
as cnt from String group by _val)");
+
+            // Query top 10 popular numbers every 5 seconds.
+            while (true) {
+                // Execute queries.
+                List<List<?>> top10 = stmCache.query(top10Qry).getAll();
+                List<List<?>> stats = stmCache.query(statsQry).getAll();
+
+                // Print average count.
+                List<?> row = stats.get(0);
+
+                if (row.get(0) != null)
+                    System.out.printf("Query results [avg=%.2f, min=%d, 
max=%d]%n", row.get(0), row.get(1), row.get(2));
+
+                // Print top 10 words.
+                ExamplesUtils.printQueryResults(top10);
+
+                Thread.sleep(5000);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
new file mode 100644
index 0000000..c59fa51
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.wordcount;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.examples.*;
+
+import java.io.*;
+
+/**
+ * Stream words into Ignite cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link StreamWords}.</li>
+ *     <li>Start querying popular numbers using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class StreamWords {
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // The cache is configured with sliding window holding 1 second of 
the streaming data.
+            IgniteCache<AffinityUuid, String> stmCache = 
ignite.getOrCreateCache(CacheConfig.wordCache());
+
+            try (IgniteDataStreamer<AffinityUuid, String> stmr = 
ignite.dataStreamer(stmCache.getName())) {
+                // Stream words from "alice-in-wonderland" book.
+                while (true) {
+                    InputStream in = 
StreamWords.class.getResourceAsStream("alice-in-wonderland.txt");
+
+                    try (LineNumberReader rdr = new LineNumberReader(new 
InputStreamReader(in))) {
+                        for (String line = rdr.readLine(); line != null; line 
= rdr.readLine()) {
+                            for (String word : line.split(" "))
+                                if (!word.isEmpty())
+                                    // Stream words into Ignite.
+                                    // By using AffinityUuid we ensure that 
identical
+                                    // words are processed on the same cluster 
node.
+                                    stmr.addData(new AffinityUuid(word), word);
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

Reply via email to