# ignite-45 - fixing streaming.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8ec30798
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8ec30798
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8ec30798

Branch: refs/heads/ignite-45
Commit: 8ec30798cc26dd8d002a3704bf0e4a1a5ff8e7f2
Parents: 115c712
Author: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Authored: Fri Mar 20 21:33:13 2015 -0700
Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Committed: Fri Mar 20 21:33:13 2015 -0700

----------------------------------------------------------------------
 .../datastructures/IgniteSetExample.java        |   1 -
 .../ignite/examples/java8/ExamplesUtils.java    |   4 +-
 .../java8/streaming/marketdata/Instrument.java  | 128 +++++++++++++++++++
 .../java8/streaming/marketdata/MarketTick.java  |  59 +++++++++
 .../java8/streaming/numbers/CacheConfig.java    |   2 -
 .../streaming/numbers/QueryPopularNumbers.java  |  36 ++++--
 .../streaming/numbers/StreamRandomNumbers.java  |  32 ++---
 .../java/org/apache/ignite/IgniteCache.java     |   2 +-
 8 files changed, 228 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSetExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSetExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSetExample.java
index 1fd936b..cfd7d45 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSetExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSetExample.java
@@ -70,7 +70,6 @@ public class IgniteSetExample {
 
                 clearAndRemoveSet();
             }
-
         }
 
         System.out.println("Ignite set example finished.");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java 
b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
index 7b36fd9..5b431bc 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
@@ -104,11 +104,9 @@ public class ExamplesUtils {
         if (res == null || res.isEmpty())
             System.out.println("Query result set is empty.");
         else {
-            System.out.println("Query results:");
-
             for (Object row : res) {
                 if (row instanceof List) {
-                    System.out.print("  (");
+                    System.out.print("(");
 
                     List<?> l = (List)row;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
new file mode 100644
index 0000000..3855d19
--- /dev/null
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.streaming.marketdata;
+
+/**
+ *
+ */
+public class Instrument {
+    /** Instrument symbol. */
+    private final String symbol;
+
+    /** Open price. */
+    private volatile double open;
+
+    /** High price. */
+    private volatile double high;
+
+    /** Low price. */
+    private volatile double low = Long.MAX_VALUE;
+
+    /** Close price. */
+    private volatile double close;
+
+    /**
+     * @param symbol Symbol.
+     */
+    Instrument(String symbol) {
+        this.symbol = symbol;
+    }
+
+    /**
+     * @return Copy of this instance.
+     */
+    public synchronized Instrument copy() {
+        Instrument res = new Instrument(symbol);
+
+        res.open = open;
+        res.high = high;
+        res.low = low;
+        res.close = close;
+
+        return res;
+    }
+
+    /**
+     * Updates this bar with last price.
+     *
+     * @param price Price.
+     */
+    public synchronized void update(double price) {
+        if (open == 0)
+            open = price;
+
+        high = Math.max(high, price);
+        low = Math.min(low, price);
+        close = price;
+    }
+
+    /**
+     * Updates this bar with next bar.
+     *
+     * @param instrument Next bar.
+     */
+    public synchronized void update(Instrument instrument) {
+        if (open == 0)
+            open = instrument.open;
+
+        high = Math.max(high, instrument.high);
+        low = Math.min(low, instrument.low);
+        close = instrument.close;
+    }
+
+    /**
+     * @return Symbol.
+     */
+    public String symbol() {
+        return symbol;
+    }
+
+    /**
+     * @return Open price.
+     */
+    public double open() {
+        return open;
+    }
+
+    /**
+     * @return High price.
+     */
+    public double high() {
+        return high;
+    }
+
+    /**
+     * @return Low price.
+     */
+    public double low() {
+        return low;
+    }
+
+    /**
+     * @return Close price.
+     */
+    public double close() {
+        return close;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized String toString() {
+        return "Bar [symbol=" + symbol + ", open=" + open + ", high=" + high + 
", low=" + low +
+            ", close=" + close + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/MarketTick.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/MarketTick.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/MarketTick.java
new file mode 100644
index 0000000..7a72189
--- /dev/null
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/MarketTick.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.streaming.marketdata;
+
+import java.io.*;
+
+/**
+ * Represents a market tick data.
+ */
+public class MarketTick implements Serializable {
+    /** Instrument symbol. */
+    private final String symbol;
+
+    /** Price. */
+    private final double price;
+
+    /**
+     * @param symbol Symbol.
+     * @param price Price.
+     */
+    MarketTick(String symbol, double price) {
+        this.symbol = symbol;
+        this.price = price;
+    }
+
+    /**
+     * @return Symbol.
+     */
+    public String symbol() {
+        return symbol;
+    }
+
+    /**
+     * @return Price.
+     */
+    public double price() {
+        return price;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "MarketTick [symbol=" + symbol + ", price=" + price + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java
index ada7932..bd6513a 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.examples.java8.streaming.numbers;
 
-import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 
 import javax.cache.configuration.*;
@@ -40,7 +39,6 @@ public class CacheConfig {
     public static CacheConfiguration<Integer, Long> configure() {
         CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>();
 
-        cfg.setCacheMode(CacheMode.PARTITIONED);
         cfg.setName(STREAM_NAME);
         cfg.setIndexedTypes(Integer.class, Long.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
index 173e153..40163c8 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
@@ -44,24 +44,34 @@ public class QueryPopularNumbers {
         Ignition.setClientMode(true);
 
         try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
             // The cache is configured with sliding window holding 1 second of 
the streaming data.
-            try (IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.configure())) {
-                if (!ExamplesUtils.hasServerNodes(ignite))
-                    return;
+            IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.configure());
+
+            // Select top 10 words.
+            SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val 
from Long order by _val desc limit 10");
+
+            // Select average, min, and max counts among all the words.
+            SqlFieldsQuery statsQry = new SqlFieldsQuery("select avg(_val), 
min(_val), max(_val) from Long");
+
+            // Query top 10 popular numbers every 5 seconds.
+            while (true) {
+                // Execute queries.
+                List<List<?>> top10 = stmCache.query(top10Qry).getAll();
+                List<List<?>> stats = stmCache.query(statsQry).getAll();
 
-                // Query top 10 popular numbers every 5 seconds.
-                while (true) {
-                    // Select top 10 words.
-                    SqlFieldsQuery top10 = new SqlFieldsQuery(
-                        "select _key, _val from Long order by _val desc limit 
10");
+                // Print average count.
+                List<?> row = stats.get(0);
 
-                    // Execute query.
-                    List<List<?>> results = 
stmCache.queryFields(top10).getAll();
+                if (row.get(0) != null)
+                    System.out.printf("Query results [avg=%.2f, min=%d, 
max=%d]%n", row.get(0), row.get(1), row.get(2));
 
-                    ExamplesUtils.printQueryResults(results);
+                // Print top 10 words.
+                ExamplesUtils.printQueryResults(top10);
 
-                    Thread.sleep(5000);
-                }
+                Thread.sleep(5000);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
index cfa006d..bfc8f7b 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
@@ -50,28 +50,28 @@ public class StreamRandomNumbers {
         Ignition.setClientMode(true);
 
         try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
             // The cache is configured with sliding window holding 1 second of 
the streaming data.
-            try (IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.configure())) {
-                if (!ExamplesUtils.hasServerNodes(ignite))
-                    return;
+            IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.configure());
 
-                try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(stmCache.getName())) {
-                    // Allow data updates.
-                    stmr.allowOverwrite(true);
+            try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(stmCache.getName())) {
+                // Allow data updates.
+                stmr.allowOverwrite(true);
 
-                    // Configure data transformation to count instances of the 
same word.
-                    stmr.receiver(new StreamTransformer<>((e, arg) -> {
-                        Long val = e.getValue();
+                // Configure data transformation to count instances of the 
same word.
+                stmr.receiver(new StreamTransformer<>((e, arg) -> {
+                    Long val = e.getValue();
 
-                        e.setValue(val == null ? 1L : val + 1);
+                    e.setValue(val == null ? 1L : val + 1);
 
-                        return null;
-                    }));
+                    return null;
+                }));
 
-                    // Stream random numbers into the streamer cache.
-                    while (true)
-                        stmr.addData(RAND.nextInt(RANGE), 1L);
-                }
+                // Stream random numbers into the streamer cache.
+                while (true)
+                    stmr.addData(RAND.nextInt(RANGE), 1L);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8ec30798/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 5977623..f54597d 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -196,7 +196,7 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
     public boolean isLocalLocked(K key, boolean byCurrThread);
 
     /**
-     * Queries cache. Accepts any subclass of {@link Query}.
+     * Queries cache. Accepts any subclass of {@link Query} interface.
      *
      * @param qry Query.
      * @return Cursor.

Reply via email to