Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 33c3d4348 -> 6acc79d31
# ignite-45 - fixing examples. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6acc79d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6acc79d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6acc79d3 Branch: refs/heads/ignite-45 Commit: 6acc79d3107f1e9664223d14758818ebcfecab5d Parents: 33c3d43 Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Tue Mar 17 16:58:08 2015 -0700 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Tue Mar 17 16:58:23 2015 -0700 ---------------------------------------------------------------------- .../streaming/CachePopularNumbersExample.java | 188 ----------- .../StreamingPopularNumbersExample.java | 314 ++++++++----------- 2 files changed, 129 insertions(+), 373 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6acc79d3/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java deleted file mode 100644 index 7920e35..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/CachePopularNumbersExample.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.configuration.*; - -import javax.cache.*; -import javax.cache.configuration.*; -import javax.cache.expiry.*; -import javax.cache.processor.*; -import java.util.*; - -import static java.util.concurrent.TimeUnit.*; - -/** - * Real time popular numbers counter. - * <p> - * Remote nodes should always be started with special configuration file which - * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. - * <p> - * Alternatively you can run {@link org.apache.ignite.examples.ExampleNodeStartup} in another JVM which will - * start node with {@code examples/config/example-compute.xml} configuration. - */ -public class CachePopularNumbersExample { - /** - * Cache name. - */ - private static final String STREAM_NAME = CachePopularNumbersExample.class.getSimpleName(); - - /** - * Count of most popular numbers to retrieve from cluster. - */ - private static final int POPULAR_NUMBERS_CNT = 10; - - /** - * Random number generator. - */ - private static final Random RAND = new Random(); - - /** - * Range within which to generate numbers. - */ - private static final int RANGE = 1000; - - /** - * Count of total numbers to generate. - */ - private static final int CNT = 1000000; - - /** - * Executes example. - * - * @param args Command line arguments, none required. - * @throws org.apache.ignite.IgniteException If example execution failed. - */ - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { - System.out.println(); - System.out.println(">>> Cache popular numbers example started."); - - /* - * Configure streaming cache. - * ========================= - */ - CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(); - - cfg.setCacheMode(CacheMode.PARTITIONED); - cfg.setName(STREAM_NAME); - cfg.setIndexedTypes(Integer.class, Long.class); - - // Sliding window of 1 seconds. - cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new CreatedExpiryPolicy(new Duration(SECONDS, 1)))); - - /** - * Start the streaming cache on all server nodes. - * ============================================ - */ - try (IgniteCache<Integer, Long> stmCache = ignite.createCache(cfg)) { - // Check that that server nodes have been started. - if (ignite.cluster().forDataNodes(STREAM_NAME).nodes().isEmpty()) { - System.out.println("Ignite does not have streaming cache configured: " + STREAM_NAME); - - return; - } - - // Stream random numbers from another thread. - Thread th = new Thread(new Runnable() { - @Override - public void run() { - streamData(ignite); - } - }); - - th.start(); - - // Run this example for 2 minutes. - long duration = 2 * 60 * 1000; - - long start = System.currentTimeMillis(); - - while (System.currentTimeMillis() - start < duration) { - // Select top 10 words. - SqlFieldsQuery top10 = new SqlFieldsQuery( - "select _key, _val from Long order by _val desc, _key limit ?").setArgs(10); - - List<List<?>> results = stmCache.queryFields(top10).getAll(); - - for (List<?> res : results) - System.out.println(res.get(0) + "=" + res.get(1)); - - System.out.println("----------------"); - - Thread.sleep(5000); - } - - th.interrupt(); - th.join(); - } - catch (CacheException e) { - e.printStackTrace(); - - System.out.println("Destroying cache for name '" + STREAM_NAME + "'. Please try again."); - - ignite.destroyCache(STREAM_NAME); - } - } - } - - /** - * Populates cache in real time with numbers and keeps count for every number. - * - * @param ignite Ignite. - * @throws org.apache.ignite.IgniteException If failed. - */ - private static void streamData(final Ignite ignite) throws IgniteException { - try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(STREAM_NAME)) { - stmr.allowOverwrite(true); // Allow data updates. - stmr.updater(new IncrementingUpdater()); - - while (!Thread.currentThread().isInterrupted()) - stmr.addData(RAND.nextInt(RANGE), 1L); - } - catch (IgniteInterruptedException ignore) {} - } - - /** - * Increments value for key. - */ - private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> { - @Override - public void update(IgniteCache<Integer, Long> cache, Collection<Map.Entry<Integer, Long>> entries) { - for (Map.Entry<Integer, Long> entry : entries) { - // Increment values by 1. - cache.invoke(entry.getKey(), new EntryProcessor<Integer, Long, Object>() { - @Override - public Object process(MutableEntry<Integer, Long> e, Object... args) { - Long val = e.getValue(); - - e.setValue(val == null ? 1L : val + 1); - - return null; - } - }); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6acc79d3/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java index 95a7272..847debe 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java @@ -18,233 +18,177 @@ package org.apache.ignite.examples.streaming; import org.apache.ignite.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.streamer.*; -import org.apache.ignite.streamer.index.*; -import org.jetbrains.annotations.*; - +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.expiry.*; +import javax.cache.processor.*; import java.util.*; +import static java.util.concurrent.TimeUnit.*; + /** - * Real time streaming popular numbers counter. This example receives a constant stream of - * random numbers. The gaussian distribution is chosen to make sure that numbers closer - * to 0 have higher probability. This example will find {@link #POPULAR_NUMBERS_CNT} number - * of popular numbers over last N number of numbers, where N is specified as streamer - * window size in {@code examples/config/example-streamer.xml} configuration file and - * is set to {@code 10,000}. + * Real time popular numbers counter. * <p> - * Remote nodes should always be started with special configuration file: - * {@code 'ignite.{sh|bat} examples/config/example-streamer.xml'}. - * When starting nodes this way JAR file containing the examples code - * should be placed to {@code IGNITE_HOME/libs} folder. You can build - * {@code ignite-examples.jar} by running {@code mvn package} in - * {@code IGNITE_HOME/examples} folder. After that {@code ignite-examples.jar} - * will be generated by Maven in {@code IGNITE_HOME/examples/target} folder. + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. * <p> - * Alternatively you can run {@link StreamingNodeStartup} in another JVM which will start node - * with {@code examples/config/example-streamer.xml} configuration. + * Alternatively you can run {@link org.apache.ignite.examples.ExampleNodeStartup} in another JVM which will + * start node with {@code examples/config/example-compute.xml} configuration. */ public class StreamingPopularNumbersExample { - /** Count of most popular numbers to retrieve from ignite. */ + /** + * Cache name. + */ + private static final String STREAM_NAME = StreamingPopularNumbersExample.class.getSimpleName(); + + /** + * Count of most popular numbers to retrieve from cluster. + */ private static final int POPULAR_NUMBERS_CNT = 10; - /** Random number generator. */ + /** + * Random number generator. + */ private static final Random RAND = new Random(); - /** Count of total numbers to generate. */ - private static final int CNT = 10_000_000; - - /** Comparator sorting random number entries by number popularity. */ - private static final Comparator<StreamerIndexEntry<Integer, Integer, Long>> CMP = - new Comparator<StreamerIndexEntry<Integer, Integer, Long>>() { - @Override public int compare(StreamerIndexEntry<Integer, Integer, Long> e1, - StreamerIndexEntry<Integer, Integer, Long> e2) { - return e2.value().compareTo(e1.value()); - } - }; - - /** Reducer selecting first POPULAR_NUMBERS_CNT values. */ - private static class PopularNumbersReducer implements IgniteReducer<Collection<StreamerIndexEntry<Integer, Integer, Long>>, - Collection<StreamerIndexEntry<Integer, Integer, Long>>> { - /** */ - private final List<StreamerIndexEntry<Integer, Integer, Long>> sorted = new ArrayList<>(); - - /** {@inheritDoc} */ - @Override public boolean collect(@Nullable Collection<StreamerIndexEntry<Integer, Integer, Long>> col) { - if (col != null && !col.isEmpty()) - // Add result from remote node to sorted set. - sorted.addAll(col); - - return true; - } - - /** {@inheritDoc} */ - @Override public Collection<StreamerIndexEntry<Integer, Integer, Long>> reduce() { - Collections.sort(sorted, CMP); + /** + * Range within which to generate numbers. + */ + private static final int RANGE = 1000; - return sorted.subList(0, POPULAR_NUMBERS_CNT < sorted.size() ? POPULAR_NUMBERS_CNT : sorted.size()); - } - } + /** + * Count of total numbers to generate. + */ + private static final int CNT = 1000000; /** * Executes example. * * @param args Command line arguments, none required. - * @throws IgniteException If example execution failed. + * @throws org.apache.ignite.IgniteException If example execution failed. */ - public static void main(String[] args) throws IgniteException { - Timer popularNumbersQryTimer = new Timer("numbers-query-worker"); + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { + System.out.println(); + System.out.println(">>> Cache popular numbers example started."); + + /* + * Configure streaming cache. + * ========================= + */ + CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(); + + cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setName(STREAM_NAME); + cfg.setIndexedTypes(Integer.class, Long.class); + + // Sliding window of 1 seconds. + cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new CreatedExpiryPolicy(new Duration(SECONDS, 1)))); + + /** + * Start the streaming cache on all server nodes. + * ============================================ + */ + try (IgniteCache<Integer, Long> stmCache = ignite.createCache(cfg)) { + // Check that that server nodes have been started. + if (ignite.cluster().forDataNodes(STREAM_NAME).nodes().isEmpty()) { + System.out.println("Ignite does not have streaming cache configured: " + STREAM_NAME); + + return; + } - // Start ignite. - final Ignite ignite = Ignition.start("examples/config/example-streamer.xml"); + // Stream random numbers from another thread. + Thread th = new Thread(new Runnable() { + @Override + public void run() { + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(STREAM_NAME)) { + stmr.allowOverwrite(true); // Allow data updates. + stmr.updater(new IncrementingUpdater()); + + while (!Thread.currentThread().isInterrupted()) + stmr.addData(RAND.nextInt(RANGE), 1L); + } + } + }); - System.out.println(); - System.out.println(">>> Streaming popular numbers example started."); + th.start(); - try { - // Schedule query to find most popular words to run every 3 seconds. - TimerTask task = scheduleQuery(ignite, popularNumbersQryTimer); + // Run this example for 2 minutes. + long duration = 2 * 60 * 1000; - streamData(ignite); + long start = System.currentTimeMillis(); - // Force one more run to get final counts. - task.run(); + while (System.currentTimeMillis() - start < duration) { + // Select top 10 words. + SqlFieldsQuery top10 = new SqlFieldsQuery( + "select _key, _val from Long order by _val desc, _key limit ?").setArgs(10); - popularNumbersQryTimer.cancel(); + List<List<?>> results = stmCache.queryFields(top10).getAll(); - // Reset all streamers on all nodes to make sure that - // consecutive executions start from scratch. - ignite.compute().broadcast(new IgniteRunnable() { - @Override public void run() { - if (!ExamplesUtils.hasStreamer(ignite, "popular-numbers")) - System.err.println("Default streamer not found (is example-streamer.xml " + - "configuration used on all nodes?)"); - else { - IgniteStreamer streamer = ignite.streamer("popular-numbers"); + for (List<?> res : results) + System.out.println(res.get(0) + "=" + res.get(1)); - System.out.println("Clearing number counters from streamer."); + System.out.println("----------------"); - streamer.reset(); - } + Thread.sleep(5000); } - }); - } - finally { - Ignition.stop(true); - } - } - - /** - * Streams random numbers into the system. - * - * @param ignite Ignite. - * @throws IgniteException If failed. - */ - private static void streamData(final Ignite ignite) throws IgniteException { - final IgniteStreamer streamer = ignite.streamer("popular-numbers"); - - // Use gaussian distribution to ensure that - // numbers closer to 0 have higher probability. - for (int i = 0; i < CNT; i++) - streamer.addEvent(((Double)(RAND.nextGaussian() * 10)).intValue()); - } - - /** - * Schedules our popular numbers query to run every 3 seconds. - * - * @param ignite Ignite. - * @param timer Timer. - * @return Scheduled task. - */ - private static TimerTask scheduleQuery(final Ignite ignite, Timer timer) { - TimerTask task = new TimerTask() { - @Override public void run() { - final IgniteStreamer streamer = ignite.streamer("popular-numbers"); - - try { - // Send reduce query to all 'popular-numbers' streamers - // running on local and remote nodes. - Collection<StreamerIndexEntry<Integer, Integer, Long>> col = streamer.context().reduce( - // This closure will execute on remote nodes. - new IgniteClosure<StreamerContext, - Collection<StreamerIndexEntry<Integer, Integer, Long>>>() { - @Override public Collection<StreamerIndexEntry<Integer, Integer, Long>> apply( - StreamerContext ctx) { - StreamerIndex<Integer, Integer, Long> view = ctx.<Integer>window().index(); - - return view.entries(-1 * POPULAR_NUMBERS_CNT); - } - }, - // The reducer will always execute locally, on the same node - // that submitted the query. - new PopularNumbersReducer()); - - for (StreamerIndexEntry<Integer, Integer, Long> cntr : col) - System.out.printf("%3d=%d\n", cntr.key(), cntr.value()); - System.out.println("----------------"); - } - catch (IgniteException e) { - e.printStackTrace(); - } + th.interrupt(); + th.join(); } - }; + catch (CacheException e) { + e.printStackTrace(); - timer.schedule(task, 3000, 3000); + System.out.println("Destroying cache for name '" + STREAM_NAME + "'. Please try again."); - return task; + ignite.destroyCache(STREAM_NAME); + } + } } /** - * Sample streamer stage to compute average. + * Populates cache in real time with numbers and keeps count for every number. + * + * @param ignite Ignite. + * @throws org.apache.ignite.IgniteException If failed. */ - @SuppressWarnings("PublicInnerClass") - public static class StreamerStage implements org.apache.ignite.streamer.StreamerStage<Integer> { - /** {@inheritDoc} */ - @Override public String name() { - return "exampleStage"; - } - - /** {@inheritDoc} */ - @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Integer> nums) { - StreamerWindow<Integer> win = ctx.window(); - - // Add numbers to window. - win.enqueueAll(nums); - - // Clear evicted numbers. - win.clearEvicted(); + private static void streamData(final Ignite ignite) throws IgniteException { + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(STREAM_NAME)) { + stmr.allowOverwrite(true); // Allow data updates. + stmr.updater(new IncrementingUpdater()); - // Null means that there are no more stages - // and that stage pipeline is completed. - return null; + while (!Thread.currentThread().isInterrupted()) + stmr.addData(RAND.nextInt(RANGE), 1L); } + catch (IgniteInterruptedException ignore) {} } /** - * This class will be set as part of window index configuration. + * Increments value for key. */ - private static class IndexUpdater implements StreamerIndexUpdater<Integer, Integer, Long> { - /** {@inheritDoc} */ - @Override public Integer indexKey(Integer evt) { - // We use event as index key, so event and key are the same. - return evt; - } - - /** {@inheritDoc} */ - @Nullable @Override public Long onAdded(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) { - return entry.value() + 1; - } - - /** {@inheritDoc} */ - @Nullable @Override public Long onRemoved(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) { - return entry.value() - 1 == 0 ? null : entry.value() - 1; - } - - /** {@inheritDoc} */ - @Override public Long initialValue(Integer evt, Integer key) { - return 1L; + private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> { + @Override + public void update(IgniteCache<Integer, Long> cache, Collection<Map.Entry<Integer, Long>> entries) { + for (Map.Entry<Integer, Long> entry : entries) { + // Increment values by 1. + cache.invoke(entry.getKey(), new EntryProcessor<Integer, Long, Object>() { + @Override + public Object process(MutableEntry<Integer, Long> e, Object... args) { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + } + }); + } } } }