Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 9b2a16e69 -> 59e27ee65
# ignite-45 - fixing examples. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/59e27ee6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/59e27ee6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/59e27ee6 Branch: refs/heads/ignite-45 Commit: 59e27ee65627b87de50178372048fe48a339f396 Parents: 9b2a16e Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Mon Mar 16 23:39:04 2015 -0700 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Mon Mar 16 23:39:04 2015 -0700 ---------------------------------------------------------------------- .../datagrid/CachePopularNumbersExample.java | 8 +- .../streaming/CachePopularNumbersExample.java | 179 +++++++++++++++++++ 2 files changed, 183 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59e27ee6/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java index f16b71e..0b5fa15 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java @@ -127,9 +127,9 @@ public class CachePopularNumbersExample { IgniteCache<Integer, Long> cache = ignite.jcache(CACHE_NAME); try { - List<List<?>> results = new ArrayList<>(cache.queryFields( + List<List<?>> results = cache.queryFields( new SqlFieldsQuery("select _key, _val from Long order by _val desc, _key limit ?").setArgs(cnt)) - .getAll()); + .getAll(); for (List<?> res : results) System.out.println(res.get(0) + "=" + res.get(1)); @@ -152,8 +152,8 @@ public class CachePopularNumbersExample { */ private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> { /** Process entries to increase value by entry key. */ - private static final EntryProcessor<Integer, Long, Void> INC = new EntryProcessor<Integer, Long, Void>() { - @Override public Void process(MutableEntry<Integer, Long> e, Object... args) { + private static final EntryProcessor<Integer, Long, ?> INC = new EntryProcessor<Integer, Long, Object>() { + @Override public Object process(MutableEntry<Integer, Long> e, Object... args) { Long val = e.getValue(); e.setValue(val == null ? 1L : val + 1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59e27ee6/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java new file mode 100644 index 0000000..81d5f6c --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; + +import javax.cache.configuration.*; +import javax.cache.expiry.*; +import javax.cache.processor.*; +import java.util.*; + +import static java.util.concurrent.TimeUnit.*; + +/** + * Real time popular numbers counter. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. + * <p> + * Alternatively you can run {@link org.apache.ignite.examples.ExampleNodeStartup} in another JVM which will + * start node with {@code examples/config/example-compute.xml} configuration. + */ +public class CachePopularNumbersExample { + /** + * Cache name. + */ + private static final String STREAM_NAME = CachePopularNumbersExample.class.getSimpleName(); + + /** + * Count of most popular numbers to retrieve from cluster. + */ + private static final int POPULAR_NUMBERS_CNT = 10; + + /** + * Random number generator. + */ + private static final Random RAND = new Random(); + + /** + * Range within which to generate numbers. + */ + private static final int RANGE = 1000; + + /** + * Count of total numbers to generate. + */ + private static final int CNT = 1000000; + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws org.apache.ignite.IgniteException If example execution failed. + */ + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start()) { + System.out.println(); + System.out.println(">>> Cache popular numbers example started."); + + /* + * Configure streaming cache. + * ========================= + */ + CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(); + + cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setName(STREAM_NAME); + cfg.setIndexedTypes(Integer.class, Long.class); + + // Sliding window of 5 seconds. + cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new CreatedExpiryPolicy(new Duration(SECONDS, 5)))); + + /** + * Start the streaming cache on all server nodes. + * ============================================ + */ + try (IgniteCache<Integer, Long> stmCache = ignite.createCache(cfg)) { + // Check that that server nodes have been started. + if (ignite.cluster().forCacheNodes(STREAM_NAME).nodes().isEmpty()) { + System.out.println("Ignite does not have streaming cache configured: " + STREAM_NAME); + + return; + } + + // Stream random numbers from another thread. + Thread th = new Thread(new Runnable() { + @Override + public void run() { + streamData(ignite); + } + }); + + th.start(); + + // Run this example for 3 minutes. + long duration = 3 * 60 * 60 * 1000; + + long start = System.currentTimeMillis(); + + while (System.currentTimeMillis() - start < duration) { + // Select top 10 words. + SqlFieldsQuery top10 = new SqlFieldsQuery( + "select _key, _val from Long order by _val desc, _key limit ?").setArgs(10); + + List<List<?>> results = stmCache.queryFields(top10).getAll(); + + for (List<?> res : results) + System.out.println(res.get(0) + "=" + res.get(1)); + + System.out.println("----------------"); + + Thread.sleep(5000); + } + + th.interrupt(); + th.join(); + } + } + } + + /** + * Populates cache in real time with numbers and keeps count for every number. + * + * @param ignite Ignite. + * @throws org.apache.ignite.IgniteException If failed. + */ + private static void streamData(final Ignite ignite) throws IgniteException { + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(STREAM_NAME)) { + stmr.allowOverwrite(true); // Allow data updates. + stmr.updater(new IncrementingUpdater()); + + while (!Thread.currentThread().isInterrupted()) + stmr.addData(RAND.nextInt(RANGE), 1L); + } + } + + /** + * Increments value for key. + */ + private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> { + @Override + public void update(IgniteCache<Integer, Long> cache, Collection<Map.Entry<Integer, Long>> entries) { + for (Map.Entry<Integer, Long> entry : entries) { + // Increment values by 1. + cache.invoke(entry.getKey(), new EntryProcessor<Integer, Long, Object>() { + @Override + public Object process(MutableEntry<Integer, Long> e, Object... args) { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + } + }); + } + } + } +}