Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-45 305f8a2e3 -> 093a2fc94


# ignite-45 - fixing streaming.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/093a2fc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/093a2fc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/093a2fc9

Branch: refs/heads/ignite-45
Commit: 093a2fc94248b4d67cf4c2179f450e40c41b8408
Parents: 305f8a2
Author: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Authored: Sat Mar 21 01:33:24 2015 -0700
Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Committed: Sat Mar 21 01:33:24 2015 -0700

----------------------------------------------------------------------
 .../ignite/examples/java8/ExamplesUtils.java    |   7 +-
 .../java8/streaming/marketdata/CacheConfig.java |  45 ++++++++
 .../java8/streaming/marketdata/Instrument.java  |  66 ++++-------
 .../marketdata/QueryTopInstruments.java         |  70 ++++++++++++
 .../streaming/marketdata/StreamMarketData.java  | 111 +++++++++++++++++++
 .../streaming/numbers/QueryPopularNumbers.java  |   6 +-
 .../streaming/numbers/StreamRandomNumbers.java  |   6 +-
 .../org/apache/ignite/IgniteDataStreamer.java   |   4 +-
 .../configuration/CacheConfiguration.java       |   5 +
 .../org/apache/ignite/stream/StreamVisitor.java |  53 +++++++++
 10 files changed, 316 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java 
b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
index 5b431bc..0fc7506 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
@@ -111,7 +111,12 @@ public class ExamplesUtils {
                     List<?> l = (List)row;
 
                     for (int i = 0; i < l.size(); i++) {
-                        System.out.print(l.get(i));
+                        Object o = l.get(i);
+
+                        if (o instanceof Double || o instanceof Float)
+                            System.out.printf("%.2f", o);
+                        else
+                            System.out.print(l.get(i));
 
                         if (i + 1 != l.size())
                             System.out.print(',');

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java
new file mode 100644
index 0000000..90bdcf1
--- /dev/null
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.streaming.marketdata;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ * Configuration for the streaming cache to store the stream of random numbers.
+ * This cache is configured with sliding window of 1 second, which means that
+ * data older than 1 second will be automatically removed from the cache.
+ */
+public class CacheConfig {
+    /** Cache name. */
+    public static final String STREAM_NAME = "marketTicks";
+
+    /**
+     * Configure streaming cache.
+     */
+    public static CacheConfiguration<String, MarketTick> marketTicksCache() {
+        return new CacheConfiguration<>("marketTicks");
+    }
+
+    public static CacheConfiguration<String, Instrument> instrumentCache() {
+        CacheConfiguration<String, Instrument> instCache = new 
CacheConfiguration<>("instCache");
+
+        instCache.setIndexedTypes(String.class, Instrument.class);
+
+        return instCache;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
index 3855d19..62b3721 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
@@ -17,24 +17,31 @@
 
 package org.apache.ignite.examples.java8.streaming.marketdata;
 
+import org.apache.ignite.cache.query.annotations.*;
+
+import java.io.*;
+
 /**
- *
+ * Financial instrument.
  */
-public class Instrument {
+public class Instrument implements Serializable {
     /** Instrument symbol. */
+    @QuerySqlField(index = true)
     private final String symbol;
 
     /** Open price. */
-    private volatile double open;
+    @QuerySqlField(index = true)
+    private double open;
 
     /** High price. */
-    private volatile double high;
+    private double high;
 
     /** Low price. */
-    private volatile double low = Long.MAX_VALUE;
+    private double low = Long.MAX_VALUE;
 
     /** Close price. */
-    private volatile double close;
+    @QuerySqlField(index = true)
+    private double latest;
 
     /**
      * @param symbol Symbol.
@@ -44,45 +51,17 @@ public class Instrument {
     }
 
     /**
-     * @return Copy of this instance.
-     */
-    public synchronized Instrument copy() {
-        Instrument res = new Instrument(symbol);
-
-        res.open = open;
-        res.high = high;
-        res.low = low;
-        res.close = close;
-
-        return res;
-    }
-
-    /**
      * Updates this bar with last price.
      *
-     * @param price Price.
-     */
-    public synchronized void update(double price) {
-        if (open == 0)
-            open = price;
-
-        high = Math.max(high, price);
-        low = Math.min(low, price);
-        close = price;
-    }
-
-    /**
-     * Updates this bar with next bar.
-     *
-     * @param instrument Next bar.
+     * @param latest Price.
      */
-    public synchronized void update(Instrument instrument) {
+    public void update(double latest) {
         if (open == 0)
-            open = instrument.open;
+            open = latest;
 
-        high = Math.max(high, instrument.high);
-        low = Math.min(low, instrument.low);
-        close = instrument.close;
+        high = Math.max(high, latest);
+        low = Math.min(low, latest);
+        this.latest = latest;
     }
 
     /**
@@ -116,13 +95,12 @@ public class Instrument {
     /**
      * @return Close price.
      */
-    public double close() {
-        return close;
+    public double latest() {
+        return latest;
     }
 
     /** {@inheritDoc} */
     @Override public synchronized String toString() {
-        return "Bar [symbol=" + symbol + ", open=" + open + ", high=" + high + 
", low=" + low +
-            ", close=" + close + ']';
+        return "Instrument [symbol=" + symbol + ", latest=" + latest + ", 
change=" + (latest - open) + ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java
new file mode 100644
index 0000000..649ff66
--- /dev/null
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.streaming.marketdata;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.examples.java8.*;
+
+import java.util.*;
+
+/**
+ * Periodically query popular numbers from the streaming cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link StreamMarketData}.</li>
+ *     <li>Start querying top performing instruments using {@link 
QueryTopInstruments}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class QueryTopInstruments {
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            IgniteCache<String, Instrument> instCache = 
ignite.getOrCreateCache(CacheConfig.instrumentCache());
+
+            // Select top 3 instruments.
+            SqlFieldsQuery top3qry = new SqlFieldsQuery(
+                "select symbol, (latest - open) from Instrument order by 
(latest - open) desc limit 3");
+
+            // Query top 3 best performing instruments every 5 seconds.
+            while (true) {
+                // Execute queries.
+                List<List<?>> top3 = instCache.query(top3qry).getAll();
+
+                // Print average count.
+                List<?> row = top3.get(0);
+
+                if (row.get(0) != null)
+                    System.out.println("Top Performing Instruments:");
+
+                // Print top 10 words.
+                ExamplesUtils.printQueryResults(top3);
+
+                Thread.sleep(5000);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
new file mode 100644
index 0000000..c2c1c2a
--- /dev/null
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.streaming.marketdata;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.java8.*;
+import org.apache.ignite.stream.*;
+
+import java.util.*;
+
+/**
+ * Stream random numbers into the streaming cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link StreamMarketData}.</li>
+ *     <li>Start querying top performing instruments using {@link 
QueryTopInstruments}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class StreamMarketData {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Count of total numbers to generate. */
+    private static final int CNT = 10000000;
+
+    /** The list of instruments. */
+    private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", 
"EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"};
+
+    /** The list of initial instrument prices. */
+    private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 
23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50};
+
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // The cache is configured with sliding window holding 1 second of 
the streaming data.
+            IgniteCache<String, MarketTick> mktCache = 
ignite.getOrCreateCache(CacheConfig.marketTicksCache());
+            IgniteCache<String, Instrument> instCache = 
ignite.getOrCreateCache(CacheConfig.instrumentCache());
+
+            try (IgniteDataStreamer<String, MarketTick> stmr = 
ignite.dataStreamer(mktCache.getName())) {
+                // Allow data updates.
+                stmr.allowOverwrite(true);
+
+                // Note that we receive market data, but do not populate 
'mktCache' (it remains empty).
+                // Instead we update the instruments in the 'instCache'.
+                stmr.receiver(new StreamVisitor<>((cache, e) -> {
+                    String symbol = e.getKey();
+                    MarketTick tick = e.getValue();
+
+                    Instrument inst = instCache.get(symbol);
+
+                    if (inst == null) {
+                        Instrument old = instCache.getAndPutIfAbsent(symbol, 
inst = new Instrument(symbol));
+
+                        if (old != null)
+                            inst = old;
+                    }
+
+                    inst.update(tick.price());
+
+                    instCache.put(symbol, inst);
+                }));
+
+                // Stream market data into market data stream cache.
+                while (true) {
+                    for (int j = 0; j < INSTRUMENTS.length; j++) {
+                        // Use gaussian distribution to ensure that
+                        // numbers closer to 0 have higher probability.
+                        double price = round2(INITIAL_PRICES[j] + 
RAND.nextGaussian());
+
+                        MarketTick tick = new MarketTick(INSTRUMENTS[j], 
price);
+
+                        stmr.addData(tick.symbol(), tick);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Rounds double value to two significant signs.
+     *
+     * @param val value to be rounded.
+     * @return rounded double value.
+     */
+    private static double round2(double val) {
+        return Math.floor(100 * val + 0.5) / 100;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
index 40163c8..f862553 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
@@ -32,11 +32,7 @@ import java.util.*;
  *     <li>Start querying popular numbers using {@link 
QueryPopularNumbers}.</li>
  * </ul>
  * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-compute.xml'}.
- * <p>
- * Alternatively you can run {@link ExampleNodeStartup} in another JVM which 
will
- * start node with {@code examples/config/example-compute.xml} configuration.
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
  */
 public class QueryPopularNumbers {
     public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
index bfc8f7b..c4e87d6 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
@@ -32,11 +32,7 @@ import java.util.*;
  *     <li>Start querying popular numbers using {@link 
QueryPopularNumbers}.</li>
  * </ul>
  * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-compute.xml'}.
- * <p>
- * Alternatively you can run {@link ExampleNodeStartup} in another JVM which 
will
- * start node with {@code examples/config/example-compute.xml} configuration.
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
  */
 public class StreamRandomNumbers {
     /** Random number generator. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index 8e81124..4dcb2b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -229,9 +229,9 @@ public interface IgniteDataStreamer<K, V> extends 
AutoCloseable {
     /**
      * Sets custom stream receiver to this data streamer.
      *
-     * @param updater Stream receiver.
+     * @param rcvr Stream receiver.
      */
-    public void receiver(StreamReceiver<K, V> updater);
+    public void receiver(StreamReceiver<K, V> rcvr);
 
     /**
      * Adds key for removal on remote node. Equivalent to {@link 
#addData(Object, Object) addData(key, null)}.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 12f7040..2233cfa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -331,6 +331,11 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
         /* No-op. */
     }
 
+    /** Cache name. */
+    public CacheConfiguration(String name) {
+        this.name = name;
+    }
+
     /**
      * Copy constructor.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/093a2fc9/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java 
b/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java
new file mode 100644
index 0000000..0474278
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ * Convenience adapter to visit every key-value tuple in the stream. Note, 
that the visitor
+ * does not update the cache. If the tuple needs to be stored in the cache,
+ * then {@code cache.put(...)} should be called explicitely.
+ */
+public class StreamVisitor<K, V> implements StreamReceiver<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Tuple visitor. */
+    private IgniteBiInClosure<IgniteCache<K, V>, Map.Entry<K, V>> vis;
+
+    /**
+     * Visitor to visit every stream key-value tuple. Note, that the visitor
+     * does not update the cache. If the tuple needs to be stored in the cache,
+     * then {@code cache.put(...)} should be called explicitely.
+     *
+     * @param vis Stream key-value tuple visitor.
+     */
+    public StreamVisitor(IgniteBiInClosure<IgniteCache<K, V>, Map.Entry<K, V>> 
vis) {
+        this.vis = vis;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receive(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) throws IgniteException {
+        for (Map.Entry<K, V> entry : entries)
+            vis.apply(cache, entry);
+    }
+}

Reply via email to