Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-45 33c3d4348 -> 6acc79d31


# ignite-45 - fixing examples.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6acc79d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6acc79d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6acc79d3

Branch: refs/heads/ignite-45
Commit: 6acc79d3107f1e9664223d14758818ebcfecab5d
Parents: 33c3d43
Author: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Authored: Tue Mar 17 16:58:08 2015 -0700
Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Committed: Tue Mar 17 16:58:23 2015 -0700

----------------------------------------------------------------------
 .../streaming/CachePopularNumbersExample.java   | 188 -----------
 .../StreamingPopularNumbersExample.java         | 314 ++++++++-----------
 2 files changed, 129 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6acc79d3/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java
deleted file mode 100644
index 7920e35..0000000
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.configuration.*;
-
-import javax.cache.*;
-import javax.cache.configuration.*;
-import javax.cache.expiry.*;
-import javax.cache.processor.*;
-import java.util.*;
-
-import static java.util.concurrent.TimeUnit.*;
-
-/**
- * Real time popular numbers counter.
- * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-compute.xml'}.
- * <p>
- * Alternatively you can run {@link 
org.apache.ignite.examples.ExampleNodeStartup} in another JVM which will
- * start node with {@code examples/config/example-compute.xml} configuration.
- */
-public class CachePopularNumbersExample {
-    /**
-     * Cache name.
-     */
-    private static final String STREAM_NAME = 
CachePopularNumbersExample.class.getSimpleName();
-
-    /**
-     * Count of most popular numbers to retrieve from cluster.
-     */
-    private static final int POPULAR_NUMBERS_CNT = 10;
-
-    /**
-     * Random number generator.
-     */
-    private static final Random RAND = new Random();
-
-    /**
-     * Range within which to generate numbers.
-     */
-    private static final int RANGE = 1000;
-
-    /**
-     * Count of total numbers to generate.
-     */
-    private static final int CNT = 1000000;
-
-    /**
-     * Executes example.
-     *
-     * @param args Command line arguments, none required.
-     * @throws org.apache.ignite.IgniteException If example execution failed.
-     */
-    public static void main(String[] args) throws Exception {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        try (Ignite ignite = 
Ignition.start("examples/config/example-compute.xml")) {
-            System.out.println();
-            System.out.println(">>> Cache popular numbers example started.");
-
-            /*
-             * Configure streaming cache.
-             * =========================
-             */
-            CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>();
-
-            cfg.setCacheMode(CacheMode.PARTITIONED);
-            cfg.setName(STREAM_NAME);
-            cfg.setIndexedTypes(Integer.class, Long.class);
-
-            // Sliding window of 1 seconds.
-            cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new 
CreatedExpiryPolicy(new Duration(SECONDS, 1))));
-
-            /**
-             * Start the streaming cache on all server nodes.
-             * ============================================
-             */
-            try (IgniteCache<Integer, Long> stmCache = 
ignite.createCache(cfg)) {
-                // Check that that server nodes have been started.
-                if 
(ignite.cluster().forDataNodes(STREAM_NAME).nodes().isEmpty()) {
-                    System.out.println("Ignite does not have streaming cache 
configured: " + STREAM_NAME);
-
-                    return;
-                }
-
-                // Stream random numbers from another thread.
-                Thread th = new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        streamData(ignite);
-                    }
-                });
-
-                th.start();
-
-                // Run this example for 2 minutes.
-                long duration = 2 * 60 * 1000;
-
-                long start = System.currentTimeMillis();
-
-                while (System.currentTimeMillis() - start < duration) {
-                    // Select top 10 words.
-                    SqlFieldsQuery top10 = new SqlFieldsQuery(
-                        "select _key, _val from Long order by _val desc, _key 
limit ?").setArgs(10);
-
-                    List<List<?>> results = 
stmCache.queryFields(top10).getAll();
-
-                    for (List<?> res : results)
-                        System.out.println(res.get(0) + "=" + res.get(1));
-
-                    System.out.println("----------------");
-
-                    Thread.sleep(5000);
-                }
-
-                th.interrupt();
-                th.join();
-            }
-            catch (CacheException e) {
-                e.printStackTrace();
-
-                System.out.println("Destroying cache for name '" + STREAM_NAME 
+ "'. Please try again.");
-
-                ignite.destroyCache(STREAM_NAME);
-            }
-        }
-    }
-
-    /**
-     * Populates cache in real time with numbers and keeps count for every 
number.
-     *
-     * @param ignite Ignite.
-     * @throws org.apache.ignite.IgniteException If failed.
-     */
-    private static void streamData(final Ignite ignite) throws IgniteException 
{
-        try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(STREAM_NAME)) {
-            stmr.allowOverwrite(true); // Allow data updates.
-            stmr.updater(new IncrementingUpdater());
-
-            while (!Thread.currentThread().isInterrupted())
-                stmr.addData(RAND.nextInt(RANGE), 1L);
-        }
-        catch (IgniteInterruptedException ignore) {}
-    }
-
-    /**
-     * Increments value for key.
-     */
-    private static class IncrementingUpdater implements 
IgniteDataStreamer.Updater<Integer, Long> {
-        @Override
-        public void update(IgniteCache<Integer, Long> cache, 
Collection<Map.Entry<Integer, Long>> entries) {
-            for (Map.Entry<Integer, Long> entry : entries) {
-                // Increment values by 1.
-                cache.invoke(entry.getKey(), new EntryProcessor<Integer, Long, 
Object>() {
-                    @Override
-                    public Object process(MutableEntry<Integer, Long> e, 
Object... args) {
-                        Long val = e.getValue();
-
-                        e.setValue(val == null ? 1L : val + 1);
-
-                        return null;
-                    }
-                });
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6acc79d3/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java
index 95a7272..847debe 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java
@@ -18,233 +18,177 @@
 package org.apache.ignite.examples.streaming;
 
 import org.apache.ignite.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.streamer.*;
-import org.apache.ignite.streamer.index.*;
-import org.jetbrains.annotations.*;
-
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.expiry.*;
+import javax.cache.processor.*;
 import java.util.*;
 
+import static java.util.concurrent.TimeUnit.*;
+
 /**
- * Real time streaming popular numbers counter. This example receives a 
constant stream of
- * random numbers. The gaussian distribution is chosen to make sure that 
numbers closer
- * to 0 have higher probability. This example will find {@link 
#POPULAR_NUMBERS_CNT} number
- * of popular numbers over last N number of numbers, where N is specified as 
streamer
- * window size in {@code examples/config/example-streamer.xml} configuration 
file and
- * is set to {@code 10,000}.
+ * Real time popular numbers counter.
  * <p>
- * Remote nodes should always be started with special configuration file:
- * {@code 'ignite.{sh|bat} examples/config/example-streamer.xml'}.
- * When starting nodes this way JAR file containing the examples code
- * should be placed to {@code IGNITE_HOME/libs} folder. You can build
- * {@code ignite-examples.jar} by running {@code mvn package} in
- * {@code IGNITE_HOME/examples} folder. After that {@code ignite-examples.jar}
- * will be generated by Maven in {@code IGNITE_HOME/examples/target} folder.
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-compute.xml'}.
  * <p>
- * Alternatively you can run {@link StreamingNodeStartup} in another JVM which 
will start node
- * with {@code examples/config/example-streamer.xml} configuration.
+ * Alternatively you can run {@link 
org.apache.ignite.examples.ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-compute.xml} configuration.
  */
 public class StreamingPopularNumbersExample {
-    /** Count of most popular numbers to retrieve from ignite. */
+    /**
+     * Cache name.
+     */
+    private static final String STREAM_NAME = 
StreamingPopularNumbersExample.class.getSimpleName();
+
+    /**
+     * Count of most popular numbers to retrieve from cluster.
+     */
     private static final int POPULAR_NUMBERS_CNT = 10;
 
-    /** Random number generator. */
+    /**
+     * Random number generator.
+     */
     private static final Random RAND = new Random();
 
-    /** Count of total numbers to generate. */
-    private static final int CNT = 10_000_000;
-
-    /** Comparator sorting random number entries by number popularity. */
-    private static final Comparator<StreamerIndexEntry<Integer, Integer, 
Long>> CMP =
-        new Comparator<StreamerIndexEntry<Integer, Integer, Long>>() {
-            @Override public int compare(StreamerIndexEntry<Integer, Integer, 
Long> e1,
-                StreamerIndexEntry<Integer, Integer, Long> e2) {
-                return e2.value().compareTo(e1.value());
-            }
-        };
-
-    /** Reducer selecting first POPULAR_NUMBERS_CNT values. */
-    private static class PopularNumbersReducer implements 
IgniteReducer<Collection<StreamerIndexEntry<Integer, Integer, Long>>,
-            Collection<StreamerIndexEntry<Integer, Integer, Long>>> {
-        /** */
-        private final List<StreamerIndexEntry<Integer, Integer, Long>> sorted 
= new ArrayList<>();
-
-        /** {@inheritDoc} */
-        @Override public boolean collect(@Nullable 
Collection<StreamerIndexEntry<Integer, Integer, Long>> col) {
-            if (col != null && !col.isEmpty())
-                // Add result from remote node to sorted set.
-                sorted.addAll(col);
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<StreamerIndexEntry<Integer, Integer, 
Long>> reduce() {
-            Collections.sort(sorted, CMP);
+    /**
+     * Range within which to generate numbers.
+     */
+    private static final int RANGE = 1000;
 
-            return sorted.subList(0, POPULAR_NUMBERS_CNT < sorted.size() ? 
POPULAR_NUMBERS_CNT : sorted.size());
-        }
-    }
+    /**
+     * Count of total numbers to generate.
+     */
+    private static final int CNT = 1000000;
 
     /**
      * Executes example.
      *
      * @param args Command line arguments, none required.
-     * @throws IgniteException If example execution failed.
+     * @throws org.apache.ignite.IgniteException If example execution failed.
      */
-    public static void main(String[] args) throws IgniteException {
-        Timer popularNumbersQryTimer = new Timer("numbers-query-worker");
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-compute.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache popular numbers example started.");
+
+            /*
+             * Configure streaming cache.
+             * =========================
+             */
+            CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>();
+
+            cfg.setCacheMode(CacheMode.PARTITIONED);
+            cfg.setName(STREAM_NAME);
+            cfg.setIndexedTypes(Integer.class, Long.class);
+
+            // Sliding window of 1 seconds.
+            cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new 
CreatedExpiryPolicy(new Duration(SECONDS, 1))));
+
+            /**
+             * Start the streaming cache on all server nodes.
+             * ============================================
+             */
+            try (IgniteCache<Integer, Long> stmCache = 
ignite.createCache(cfg)) {
+                // Check that that server nodes have been started.
+                if 
(ignite.cluster().forDataNodes(STREAM_NAME).nodes().isEmpty()) {
+                    System.out.println("Ignite does not have streaming cache 
configured: " + STREAM_NAME);
+
+                    return;
+                }
 
-        // Start ignite.
-        final Ignite ignite = 
Ignition.start("examples/config/example-streamer.xml");
+                // Stream random numbers from another thread.
+                Thread th = new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(STREAM_NAME)) {
+                            stmr.allowOverwrite(true); // Allow data updates.
+                            stmr.updater(new IncrementingUpdater());
+
+                            while (!Thread.currentThread().isInterrupted())
+                                stmr.addData(RAND.nextInt(RANGE), 1L);
+                        }
+                    }
+                });
 
-        System.out.println();
-        System.out.println(">>> Streaming popular numbers example started.");
+                th.start();
 
-        try {
-            // Schedule query to find most popular words to run every 3 
seconds.
-            TimerTask task = scheduleQuery(ignite, popularNumbersQryTimer);
+                // Run this example for 2 minutes.
+                long duration = 2 * 60 * 1000;
 
-            streamData(ignite);
+                long start = System.currentTimeMillis();
 
-            // Force one more run to get final counts.
-            task.run();
+                while (System.currentTimeMillis() - start < duration) {
+                    // Select top 10 words.
+                    SqlFieldsQuery top10 = new SqlFieldsQuery(
+                        "select _key, _val from Long order by _val desc, _key 
limit ?").setArgs(10);
 
-            popularNumbersQryTimer.cancel();
+                    List<List<?>> results = 
stmCache.queryFields(top10).getAll();
 
-            // Reset all streamers on all nodes to make sure that
-            // consecutive executions start from scratch.
-            ignite.compute().broadcast(new IgniteRunnable() {
-                @Override public void run() {
-                    if (!ExamplesUtils.hasStreamer(ignite, "popular-numbers"))
-                        System.err.println("Default streamer not found (is 
example-streamer.xml " +
-                            "configuration used on all nodes?)");
-                    else {
-                        IgniteStreamer streamer = 
ignite.streamer("popular-numbers");
+                    for (List<?> res : results)
+                        System.out.println(res.get(0) + "=" + res.get(1));
 
-                        System.out.println("Clearing number counters from 
streamer.");
+                    System.out.println("----------------");
 
-                        streamer.reset();
-                    }
+                    Thread.sleep(5000);
                 }
-            });
-        }
-        finally {
-            Ignition.stop(true);
-        }
-    }
-
-    /**
-     * Streams random numbers into the system.
-     *
-     * @param ignite Ignite.
-     * @throws IgniteException If failed.
-     */
-    private static void streamData(final Ignite ignite) throws IgniteException 
{
-        final IgniteStreamer streamer = ignite.streamer("popular-numbers");
-
-        // Use gaussian distribution to ensure that
-        // numbers closer to 0 have higher probability.
-        for (int i = 0; i < CNT; i++)
-            streamer.addEvent(((Double)(RAND.nextGaussian() * 10)).intValue());
-    }
-
-    /**
-     * Schedules our popular numbers query to run every 3 seconds.
-     *
-     * @param ignite Ignite.
-     * @param timer Timer.
-     * @return Scheduled task.
-     */
-    private static TimerTask scheduleQuery(final Ignite ignite, Timer timer) {
-        TimerTask task = new TimerTask() {
-            @Override public void run() {
-                final IgniteStreamer streamer = 
ignite.streamer("popular-numbers");
-
-                try {
-                    // Send reduce query to all 'popular-numbers' streamers
-                    // running on local and remote nodes.
-                    Collection<StreamerIndexEntry<Integer, Integer, Long>> col 
= streamer.context().reduce(
-                        // This closure will execute on remote nodes.
-                        new IgniteClosure<StreamerContext,
-                                                                            
Collection<StreamerIndexEntry<Integer, Integer, Long>>>() {
-                            @Override public 
Collection<StreamerIndexEntry<Integer, Integer, Long>> apply(
-                                StreamerContext ctx) {
-                                StreamerIndex<Integer, Integer, Long> view = 
ctx.<Integer>window().index();
-
-                                return view.entries(-1 * POPULAR_NUMBERS_CNT);
-                            }
-                        },
-                        // The reducer will always execute locally, on the 
same node
-                        // that submitted the query.
-                        new PopularNumbersReducer());
-
-                    for (StreamerIndexEntry<Integer, Integer, Long> cntr : col)
-                        System.out.printf("%3d=%d\n", cntr.key(), 
cntr.value());
 
-                    System.out.println("----------------");
-                }
-                catch (IgniteException e) {
-                    e.printStackTrace();
-                }
+                th.interrupt();
+                th.join();
             }
-        };
+            catch (CacheException e) {
+                e.printStackTrace();
 
-        timer.schedule(task, 3000, 3000);
+                System.out.println("Destroying cache for name '" + STREAM_NAME 
+ "'. Please try again.");
 
-        return task;
+                ignite.destroyCache(STREAM_NAME);
+            }
+        }
     }
 
     /**
-     * Sample streamer stage to compute average.
+     * Populates cache in real time with numbers and keeps count for every 
number.
+     *
+     * @param ignite Ignite.
+     * @throws org.apache.ignite.IgniteException If failed.
      */
-    @SuppressWarnings("PublicInnerClass")
-    public static class StreamerStage implements 
org.apache.ignite.streamer.StreamerStage<Integer> {
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return "exampleStage";
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Map<String, Collection<?>> 
run(StreamerContext ctx, Collection<Integer> nums) {
-            StreamerWindow<Integer> win = ctx.window();
-
-            // Add numbers to window.
-            win.enqueueAll(nums);
-
-            // Clear evicted numbers.
-            win.clearEvicted();
+    private static void streamData(final Ignite ignite) throws IgniteException 
{
+        try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(STREAM_NAME)) {
+            stmr.allowOverwrite(true); // Allow data updates.
+            stmr.updater(new IncrementingUpdater());
 
-            // Null means that there are no more stages
-            // and that stage pipeline is completed.
-            return null;
+            while (!Thread.currentThread().isInterrupted())
+                stmr.addData(RAND.nextInt(RANGE), 1L);
         }
+        catch (IgniteInterruptedException ignore) {}
     }
 
     /**
-     * This class will be set as part of window index configuration.
+     * Increments value for key.
      */
-    private static class IndexUpdater implements StreamerIndexUpdater<Integer, 
Integer, Long> {
-        /** {@inheritDoc} */
-        @Override public Integer indexKey(Integer evt) {
-            // We use event as index key, so event and key are the same.
-            return evt;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Long onAdded(StreamerIndexEntry<Integer, 
Integer, Long> entry, Integer evt) {
-            return entry.value() + 1;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Long onRemoved(StreamerIndexEntry<Integer, 
Integer, Long> entry, Integer evt) {
-            return entry.value() - 1 == 0 ? null : entry.value() - 1;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Long initialValue(Integer evt, Integer key) {
-            return 1L;
+    private static class IncrementingUpdater implements 
IgniteDataStreamer.Updater<Integer, Long> {
+        @Override
+        public void update(IgniteCache<Integer, Long> cache, 
Collection<Map.Entry<Integer, Long>> entries) {
+            for (Map.Entry<Integer, Long> entry : entries) {
+                // Increment values by 1.
+                cache.invoke(entry.getKey(), new EntryProcessor<Integer, Long, 
Object>() {
+                    @Override
+                    public Object process(MutableEntry<Integer, Long> e, 
Object... args) {
+                        Long val = e.getValue();
+
+                        e.setValue(val == null ? 1L : val + 1);
+
+                        return null;
+                    }
+                });
+            }
         }
     }
 }

Reply via email to