Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-45 9b2a16e69 -> 59e27ee65


# ignite-45 - fixing examples.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/59e27ee6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/59e27ee6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/59e27ee6

Branch: refs/heads/ignite-45
Commit: 59e27ee65627b87de50178372048fe48a339f396
Parents: 9b2a16e
Author: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Authored: Mon Mar 16 23:39:04 2015 -0700
Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Committed: Mon Mar 16 23:39:04 2015 -0700

----------------------------------------------------------------------
 .../datagrid/CachePopularNumbersExample.java    |   8 +-
 .../streaming/CachePopularNumbersExample.java   | 179 +++++++++++++++++++
 2 files changed, 183 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59e27ee6/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
index f16b71e..0b5fa15 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
@@ -127,9 +127,9 @@ public class CachePopularNumbersExample {
                 IgniteCache<Integer, Long> cache = ignite.jcache(CACHE_NAME);
 
                 try {
-                    List<List<?>> results = new ArrayList<>(cache.queryFields(
+                    List<List<?>> results = cache.queryFields(
                         new SqlFieldsQuery("select _key, _val from Long order 
by _val desc, _key limit ?").setArgs(cnt))
-                        .getAll());
+                        .getAll();
 
                     for (List<?> res : results)
                         System.out.println(res.get(0) + "=" + res.get(1));
@@ -152,8 +152,8 @@ public class CachePopularNumbersExample {
      */
     private static class IncrementingUpdater implements 
IgniteDataStreamer.Updater<Integer, Long> {
         /** Process entries to increase value by entry key. */
-        private static final EntryProcessor<Integer, Long, Void> INC = new 
EntryProcessor<Integer, Long, Void>() {
-            @Override public Void process(MutableEntry<Integer, Long> e, 
Object... args) {
+        private static final EntryProcessor<Integer, Long, ?> INC = new 
EntryProcessor<Integer, Long, Object>() {
+            @Override public Object process(MutableEntry<Integer, Long> e, 
Object... args) {
                 Long val = e.getValue();
 
                 e.setValue(val == null ? 1L : val + 1);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59e27ee6/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java
new file mode 100644
index 0000000..81d5f6c
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+
+import javax.cache.configuration.*;
+import javax.cache.expiry.*;
+import javax.cache.processor.*;
+import java.util.*;
+
+import static java.util.concurrent.TimeUnit.*;
+
+/**
+ * Real time popular numbers counter.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-compute.xml'}.
+ * <p>
+ * Alternatively you can run {@link 
org.apache.ignite.examples.ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-compute.xml} configuration.
+ */
+public class CachePopularNumbersExample {
+    /**
+     * Cache name.
+     */
+    private static final String STREAM_NAME = 
CachePopularNumbersExample.class.getSimpleName();
+
+    /**
+     * Count of most popular numbers to retrieve from cluster.
+     */
+    private static final int POPULAR_NUMBERS_CNT = 10;
+
+    /**
+     * Random number generator.
+     */
+    private static final Random RAND = new Random();
+
+    /**
+     * Range within which to generate numbers.
+     */
+    private static final int RANGE = 1000;
+
+    /**
+     * Count of total numbers to generate.
+     */
+    private static final int CNT = 1000000;
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws org.apache.ignite.IgniteException If example execution failed.
+     */
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = Ignition.start()) {
+            System.out.println();
+            System.out.println(">>> Cache popular numbers example started.");
+
+            /*
+             * Configure streaming cache.
+             * =========================
+             */
+            CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>();
+
+            cfg.setCacheMode(CacheMode.PARTITIONED);
+            cfg.setName(STREAM_NAME);
+            cfg.setIndexedTypes(Integer.class, Long.class);
+
+            // Sliding window of 5 seconds.
+            cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new 
CreatedExpiryPolicy(new Duration(SECONDS, 5))));
+
+            /**
+             * Start the streaming cache on all server nodes.
+             * ============================================
+             */
+            try (IgniteCache<Integer, Long> stmCache = 
ignite.createCache(cfg)) {
+                // Check that that server nodes have been started.
+                if 
(ignite.cluster().forCacheNodes(STREAM_NAME).nodes().isEmpty()) {
+                    System.out.println("Ignite does not have streaming cache 
configured: " + STREAM_NAME);
+
+                    return;
+                }
+
+                // Stream random numbers from another thread.
+                Thread th = new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        streamData(ignite);
+                    }
+                });
+
+                th.start();
+
+                // Run this example for 3 minutes.
+                long duration = 3 * 60 * 60 * 1000;
+
+                long start = System.currentTimeMillis();
+
+                while (System.currentTimeMillis() - start < duration) {
+                    // Select top 10 words.
+                    SqlFieldsQuery top10 = new SqlFieldsQuery(
+                        "select _key, _val from Long order by _val desc, _key 
limit ?").setArgs(10);
+
+                    List<List<?>> results = 
stmCache.queryFields(top10).getAll();
+
+                    for (List<?> res : results)
+                        System.out.println(res.get(0) + "=" + res.get(1));
+
+                    System.out.println("----------------");
+
+                    Thread.sleep(5000);
+                }
+
+                th.interrupt();
+                th.join();
+            }
+        }
+    }
+
+    /**
+     * Populates cache in real time with numbers and keeps count for every 
number.
+     *
+     * @param ignite Ignite.
+     * @throws org.apache.ignite.IgniteException If failed.
+     */
+    private static void streamData(final Ignite ignite) throws IgniteException 
{
+        try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(STREAM_NAME)) {
+            stmr.allowOverwrite(true); // Allow data updates.
+            stmr.updater(new IncrementingUpdater());
+
+            while (!Thread.currentThread().isInterrupted())
+                stmr.addData(RAND.nextInt(RANGE), 1L);
+        }
+    }
+
+    /**
+     * Increments value for key.
+     */
+    private static class IncrementingUpdater implements 
IgniteDataStreamer.Updater<Integer, Long> {
+        @Override
+        public void update(IgniteCache<Integer, Long> cache, 
Collection<Map.Entry<Integer, Long>> entries) {
+            for (Map.Entry<Integer, Long> entry : entries) {
+                // Increment values by 1.
+                cache.invoke(entry.getKey(), new EntryProcessor<Integer, Long, 
Object>() {
+                    @Override
+                    public Object process(MutableEntry<Integer, Long> e, 
Object... args) {
+                        Long val = e.getValue();
+
+                        e.setValue(val == null ? 1L : val + 1);
+
+                        return null;
+                    }
+                });
+            }
+        }
+    }
+}

Reply via email to