# ignite-45 - fixing streaming.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d49f9c88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d49f9c88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d49f9c88

Branch: refs/heads/ignite-501
Commit: d49f9c88506b39e384b2697a33f075ceaed66cef
Parents: 888f0a3
Author: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Authored: Thu Mar 19 01:35:27 2015 -0400
Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Committed: Thu Mar 19 01:35:27 2015 -0400

----------------------------------------------------------------------
 .../ignite/examples/java8/ExamplesUtils.java    |  64 ++++++--
 .../StreamingPopularNumbersExample.java         | 163 -------------------
 .../java8/streaming/numbers/CacheConfig.java    |  33 ++++
 .../streaming/numbers/QueryPopularNumbers.java  |  67 ++++++++
 .../streaming/numbers/StreamRandomNumbers.java  |  79 +++++++++
 .../apache/ignite/stream/StreamTransformer.java |   9 +-
 6 files changed, 234 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d49f9c88/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java 
b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
index 5b62bd4..5854cdd 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
@@ -19,9 +19,9 @@ package org.apache.ignite.examples.java8;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
-import org.apache.ignite.streamer.*;
 
 import java.net.*;
+import java.util.*;
 
 /**
  *
@@ -63,17 +63,15 @@ public class ExamplesUtils {
     /**
      * Checks minimum topology size for running a certain example.
      *
-     * @param prj Cluster to check size for.
+     * @param grp Cluster to check size for.
      * @param size Minimum number of nodes required to run a certain example.
      * @return {@code True} if check passed, {@code false} otherwise.
      */
-    public static boolean checkMinTopologySize(ClusterGroup prj, int size) {
-        int prjSize = prj.nodes().size();
+    public static boolean checkMinTopologySize(ClusterGroup grp, int size) {
+        int prjSize = grp.nodes().size();
 
         if (prjSize < size) {
-            System.out.println();
-            System.out.println(">>> Please start at least " + size + " cluster 
nodes to run example.");
-            System.out.println();
+            System.err.println(">>> Please start at least " + size + " cluster 
nodes to run example.");
 
             return false;
         }
@@ -82,18 +80,50 @@ public class ExamplesUtils {
     }
 
     /**
-     * @param ignite Ignite.
-     * @param name Streamer name.
-     * @return {@code True} if ignite has streamer with given name.
+     * Checks if cluster has server nodes.
+     *
+     * @param ignite Ignite instance.
+     * @return {@code True} if cluster has server nodes, {@code false} 
otherwise.
      */
-    public static boolean hasStreamer(Ignite ignite, String name) {
-        if (ignite.configuration().getStreamerConfiguration() != null) {
-            for (StreamerConfiguration cfg : 
ignite.configuration().getStreamerConfiguration()) {
-                if (name.equals(cfg.getName()))
-                    return true;
-            }
+    public static boolean hasServerNodes(Ignite ignite) {
+        if (ignite.cluster().forServers().nodes().isEmpty()) {
+            System.err.println("Server nodes not found (start data nodes with 
ExampleNodeStartup class)");
+
+            return false;
         }
 
-        return false;
+        return true;
+    }
+
+    /**
+     * Convenience method for printing query results.
+     *
+     * @param res Query results.
+     */
+    public static void printQueryResults(List<?> res) {
+        if (res == null)
+            System.out.println("Query result set is empty.");
+        else {
+            System.out.println("Query results:");
+
+            for (Object row : res) {
+                if (row instanceof List) {
+                    System.out.print("  (");
+
+                    List<?> l = (List)row;
+
+                    for (int i = 0; i < l.size(); i++) {
+                        System.out.print(l.get(i));
+
+                        if (i + 1 != l.size())
+                            System.out.print(',');
+                    }
+
+                    System.out.println(')');
+                }
+                else
+                    System.out.println("  " + row);
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d49f9c88/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java
deleted file mode 100644
index 3b33402..0000000
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.java8.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.examples.java8.*;
-import org.apache.ignite.stream.*;
-
-import javax.cache.*;
-import javax.cache.configuration.*;
-import javax.cache.expiry.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static java.util.concurrent.TimeUnit.*;
-
-/**
- * Real time popular numbers counter.
- * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-compute.xml'}.
- * <p>
- * Alternatively you can run {@link ExampleNodeStartup} in another JVM which 
will
- * start node with {@code examples/config/example-compute.xml} configuration.
- */
-public class StreamingPopularNumbersExample {
-    /** Cache name. */
-    private static final String STREAM_NAME = 
StreamingPopularNumbersExample.class.getSimpleName();
-
-    /** Random number generator. */
-    private static final Random RAND = new Random();
-
-    /** Range within which to generate numbers. */
-    private static final int RANGE = 1000;
-
-    /** Test duration. */
-    private static final long DURATION = 2 * 60 * 1000;
-
-    /** Flag indicating that the test is finished. */
-    private static volatile boolean finished = false;
-
-    /**
-     * Executes example.
-     *
-     * @param args Command line arguments, none required.
-     * @throws IgniteException If example execution failed.
-     */
-    public static void main(String[] args) throws Exception {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        try (Ignite ignite = 
Ignition.start("examples/config/example-compute.xml")) {
-            System.out.println();
-            System.out.println(">>> Cache popular numbers example started.");
-
-            /*
-             * Configure streaming cache.
-             * =========================
-             */
-            CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>();
-
-            cfg.setCacheMode(CacheMode.PARTITIONED);
-            cfg.setName(STREAM_NAME);
-            cfg.setIndexedTypes(Integer.class, Long.class);
-
-            // Sliding window of 1 seconds.
-            cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new 
CreatedExpiryPolicy(new Duration(SECONDS, 1))));
-
-            /**
-             * Start the streaming cache on all server nodes.
-             * ============================================
-             */
-            try (IgniteCache<Integer, Long> stmCache = 
ignite.createCache(cfg)) {
-                // Check that that server nodes have been started.
-                if 
(ignite.cluster().forDataNodes(STREAM_NAME).nodes().isEmpty()) {
-                    System.err.println("Data nodes not found (start data nodes 
with ExampleNodeStartup class)");
-
-                    return;
-                }
-
-                ExecutorService exe = startStreaming(ignite);
-
-                long start = System.currentTimeMillis();
-
-                while (System.currentTimeMillis() - start < DURATION) {
-                    // Select top 10 words.
-                    SqlFieldsQuery top10 = new SqlFieldsQuery(
-                        "select _key, _val from Long order by _val desc limit 
10");
-
-                    List<List<?>> results = 
stmCache.queryFields(top10).getAll();
-
-                    for (List<?> res : results)
-                        System.out.println(res.get(0) + "=" + res.get(1));
-
-                    System.out.println("----------------");
-
-                    Thread.sleep(5000);
-                }
-
-                finished = true;
-
-                exe.shutdown();
-            }
-            catch (CacheException e) {
-                e.printStackTrace();
-
-                System.out.println("Destroying cache for name '" + STREAM_NAME 
+ "'. Please try again.");
-
-                ignite.destroyCache(STREAM_NAME);
-            }
-        }
-    }
-
-    /**
-     * Populates the streaming cache in real time with numbers and keeps count 
for every number.
-     *
-     * @param ignite Ignite.
-     */
-    private static ExecutorService startStreaming(final Ignite ignite) {
-        ExecutorService exe = Executors.newSingleThreadExecutor();
-
-        // Stream random numbers from another thread.
-        exe.submit(() -> {
-            try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(STREAM_NAME)) {
-                // Allow data updates.
-                stmr.allowOverwrite(true);
-
-                // Configure data transformation to count instances of the 
same word.
-                stmr.receiver(new StreamTransformer<>((e, args) -> {
-                    Long val = e.getValue();
-
-                    e.setValue(val == null ? 1L : val + 1);
-
-                    return null;
-                }));
-
-
-                while (!finished)
-                    stmr.addData(RAND.nextInt(RANGE), 1L);
-            }
-        });
-
-        return exe;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d49f9c88/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java
new file mode 100644
index 0000000..76f50b1
--- /dev/null
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java
@@ -0,0 +1,33 @@
+package org.apache.ignite.examples.java8.streaming.numbers;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+
+import javax.cache.configuration.*;
+import javax.cache.expiry.*;
+
+import static java.util.concurrent.TimeUnit.*;
+
+/**
+ * Created by Dmitriy on 3/18/15.
+ */
+public class CacheConfig {
+    /** Cache name. */
+    public static final String STREAM_NAME = "randomNumbers";
+
+    /**
+     * Configure streaming cache.
+     */
+    public static CacheConfiguration<Integer, Long> configure() {
+        CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>();
+
+        cfg.setCacheMode(CacheMode.PARTITIONED);
+        cfg.setName(STREAM_NAME);
+        cfg.setIndexedTypes(Integer.class, Long.class);
+
+        // Sliding window of 1 seconds.
+        cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new 
CreatedExpiryPolicy(new Duration(SECONDS, 1))));
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d49f9c88/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
new file mode 100644
index 0000000..47be047
--- /dev/null
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.streaming.numbers;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.examples.java8.*;
+
+import java.util.*;
+
+/**
+ * Real time popular numbers counter.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-compute.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which 
will
+ * start node with {@code examples/config/example-compute.xml} configuration.
+ */
+public class QueryPopularNumbers {
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws IgniteException If example execution failed.
+     */
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-compute.xml")) {
+            // Start new cache or get existing one.
+            try (IgniteCache<Integer, Long> stmCache = 
ignite.createCache(CacheConfig.configure())) {
+                if (!ExamplesUtils.hasServerNodes(ignite))
+                    return;
+
+                while (true) {
+                    // Select top 10 words.
+                    SqlFieldsQuery top10 = new SqlFieldsQuery(
+                        "select _key, _val from Long order by _val desc limit 
10");
+
+                    // Execute query.
+                    List<List<?>> results = 
stmCache.queryFields(top10).getAll();
+
+                    ExamplesUtils.printQueryResults(results);
+
+                    Thread.sleep(5000);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d49f9c88/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
new file mode 100644
index 0000000..96472a3
--- /dev/null
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.streaming.numbers;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.java8.*;
+import org.apache.ignite.stream.*;
+
+import java.util.*;
+
+/**
+ * Real time popular numbers counter.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-compute.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which 
will
+ * start node with {@code examples/config/example-compute.xml} configuration.
+ */
+public class StreamRandomNumbers {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Range within which to generate numbers. */
+    private static final int RANGE = 1000;
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws IgniteException If example execution failed.
+     */
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-compute.xml")) {
+            // Create new cache or get existing one.
+            try (IgniteCache<Integer, Long> stmCache = 
ignite.createCache(CacheConfig.configure())) {
+                if (!ExamplesUtils.hasServerNodes(ignite))
+                    return;
+
+                try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(stmCache.getName())) {
+                    // Allow data updates.
+                    stmr.allowOverwrite(true);
+
+                    // Configure data transformation to count instances of the 
same word.
+                    stmr.receiver(new StreamTransformer<>((e, arg) -> {
+                        Long val = e.getValue();
+
+                        e.setValue(val == null ? 1L : val + 1);
+
+                        return null;
+                    }));
+
+
+                    // Stream random numbers into the streamer cache.
+                    while (true)
+                        stmr.addData(RAND.nextInt(RANGE), 1L);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d49f9c88/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java 
b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
index 4ff9a59..ea95bf1 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
@@ -24,11 +24,18 @@ import javax.cache.processor.*;
 import java.util.*;
 
 /**
- * Created by Dmitriy on 3/18/15.
+ * Convenience adapter to transform update existing values in streaming cache
+ * based on the previously cached value.
  */
 public class StreamTransformer<K, V> implements StreamReceiver<K, V> {
+    /** Entry processor. */
     private EntryProcessor<K, V, Object> ep;
 
+    /**
+     * Entry processor to update cache values based on the previously cached 
value.
+     *
+     * @param ep Entry processor.
+     */
     public StreamTransformer(CacheEntryProcessor<K, V, Object> ep) {
         this.ep = ep;
     }

Reply via email to