# ignite-45 - fixing streaming.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d49f9c88 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d49f9c88 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d49f9c88 Branch: refs/heads/ignite-501 Commit: d49f9c88506b39e384b2697a33f075ceaed66cef Parents: 888f0a3 Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Thu Mar 19 01:35:27 2015 -0400 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Thu Mar 19 01:35:27 2015 -0400 ---------------------------------------------------------------------- .../ignite/examples/java8/ExamplesUtils.java | 64 ++++++-- .../StreamingPopularNumbersExample.java | 163 ------------------- .../java8/streaming/numbers/CacheConfig.java | 33 ++++ .../streaming/numbers/QueryPopularNumbers.java | 67 ++++++++ .../streaming/numbers/StreamRandomNumbers.java | 79 +++++++++ .../apache/ignite/stream/StreamTransformer.java | 9 +- 6 files changed, 234 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d49f9c88/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java index 5b62bd4..5854cdd 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java @@ -19,9 +19,9 @@ package org.apache.ignite.examples.java8; import org.apache.ignite.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.streamer.*; import java.net.*; +import java.util.*; /** * @@ -63,17 +63,15 @@ public class ExamplesUtils { /** * Checks minimum topology size for running a certain example. * - * @param prj Cluster to check size for. + * @param grp Cluster to check size for. * @param size Minimum number of nodes required to run a certain example. * @return {@code True} if check passed, {@code false} otherwise. */ - public static boolean checkMinTopologySize(ClusterGroup prj, int size) { - int prjSize = prj.nodes().size(); + public static boolean checkMinTopologySize(ClusterGroup grp, int size) { + int prjSize = grp.nodes().size(); if (prjSize < size) { - System.out.println(); - System.out.println(">>> Please start at least " + size + " cluster nodes to run example."); - System.out.println(); + System.err.println(">>> Please start at least " + size + " cluster nodes to run example."); return false; } @@ -82,18 +80,50 @@ public class ExamplesUtils { } /** - * @param ignite Ignite. - * @param name Streamer name. - * @return {@code True} if ignite has streamer with given name. + * Checks if cluster has server nodes. + * + * @param ignite Ignite instance. + * @return {@code True} if cluster has server nodes, {@code false} otherwise. */ - public static boolean hasStreamer(Ignite ignite, String name) { - if (ignite.configuration().getStreamerConfiguration() != null) { - for (StreamerConfiguration cfg : ignite.configuration().getStreamerConfiguration()) { - if (name.equals(cfg.getName())) - return true; - } + public static boolean hasServerNodes(Ignite ignite) { + if (ignite.cluster().forServers().nodes().isEmpty()) { + System.err.println("Server nodes not found (start data nodes with ExampleNodeStartup class)"); + + return false; } - return false; + return true; + } + + /** + * Convenience method for printing query results. + * + * @param res Query results. + */ + public static void printQueryResults(List<?> res) { + if (res == null) + System.out.println("Query result set is empty."); + else { + System.out.println("Query results:"); + + for (Object row : res) { + if (row instanceof List) { + System.out.print(" ("); + + List<?> l = (List)row; + + for (int i = 0; i < l.size(); i++) { + System.out.print(l.get(i)); + + if (i + 1 != l.size()) + System.out.print(','); + } + + System.out.println(')'); + } + else + System.out.println(" " + row); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d49f9c88/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java deleted file mode 100644 index 3b33402..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.java8.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.examples.java8.*; -import org.apache.ignite.stream.*; - -import javax.cache.*; -import javax.cache.configuration.*; -import javax.cache.expiry.*; -import java.util.*; -import java.util.concurrent.*; - -import static java.util.concurrent.TimeUnit.*; - -/** - * Real time popular numbers counter. - * <p> - * Remote nodes should always be started with special configuration file which - * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. - * <p> - * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will - * start node with {@code examples/config/example-compute.xml} configuration. - */ -public class StreamingPopularNumbersExample { - /** Cache name. */ - private static final String STREAM_NAME = StreamingPopularNumbersExample.class.getSimpleName(); - - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** Range within which to generate numbers. */ - private static final int RANGE = 1000; - - /** Test duration. */ - private static final long DURATION = 2 * 60 * 1000; - - /** Flag indicating that the test is finished. */ - private static volatile boolean finished = false; - - /** - * Executes example. - * - * @param args Command line arguments, none required. - * @throws IgniteException If example execution failed. - */ - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { - System.out.println(); - System.out.println(">>> Cache popular numbers example started."); - - /* - * Configure streaming cache. - * ========================= - */ - CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(); - - cfg.setCacheMode(CacheMode.PARTITIONED); - cfg.setName(STREAM_NAME); - cfg.setIndexedTypes(Integer.class, Long.class); - - // Sliding window of 1 seconds. - cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new CreatedExpiryPolicy(new Duration(SECONDS, 1)))); - - /** - * Start the streaming cache on all server nodes. - * ============================================ - */ - try (IgniteCache<Integer, Long> stmCache = ignite.createCache(cfg)) { - // Check that that server nodes have been started. - if (ignite.cluster().forDataNodes(STREAM_NAME).nodes().isEmpty()) { - System.err.println("Data nodes not found (start data nodes with ExampleNodeStartup class)"); - - return; - } - - ExecutorService exe = startStreaming(ignite); - - long start = System.currentTimeMillis(); - - while (System.currentTimeMillis() - start < DURATION) { - // Select top 10 words. - SqlFieldsQuery top10 = new SqlFieldsQuery( - "select _key, _val from Long order by _val desc limit 10"); - - List<List<?>> results = stmCache.queryFields(top10).getAll(); - - for (List<?> res : results) - System.out.println(res.get(0) + "=" + res.get(1)); - - System.out.println("----------------"); - - Thread.sleep(5000); - } - - finished = true; - - exe.shutdown(); - } - catch (CacheException e) { - e.printStackTrace(); - - System.out.println("Destroying cache for name '" + STREAM_NAME + "'. Please try again."); - - ignite.destroyCache(STREAM_NAME); - } - } - } - - /** - * Populates the streaming cache in real time with numbers and keeps count for every number. - * - * @param ignite Ignite. - */ - private static ExecutorService startStreaming(final Ignite ignite) { - ExecutorService exe = Executors.newSingleThreadExecutor(); - - // Stream random numbers from another thread. - exe.submit(() -> { - try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(STREAM_NAME)) { - // Allow data updates. - stmr.allowOverwrite(true); - - // Configure data transformation to count instances of the same word. - stmr.receiver(new StreamTransformer<>((e, args) -> { - Long val = e.getValue(); - - e.setValue(val == null ? 1L : val + 1); - - return null; - })); - - - while (!finished) - stmr.addData(RAND.nextInt(RANGE), 1L); - } - }); - - return exe; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d49f9c88/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java new file mode 100644 index 0000000..76f50b1 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java @@ -0,0 +1,33 @@ +package org.apache.ignite.examples.java8.streaming.numbers; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; + +import javax.cache.configuration.*; +import javax.cache.expiry.*; + +import static java.util.concurrent.TimeUnit.*; + +/** + * Created by Dmitriy on 3/18/15. + */ +public class CacheConfig { + /** Cache name. */ + public static final String STREAM_NAME = "randomNumbers"; + + /** + * Configure streaming cache. + */ + public static CacheConfiguration<Integer, Long> configure() { + CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(); + + cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setName(STREAM_NAME); + cfg.setIndexedTypes(Integer.class, Long.class); + + // Sliding window of 1 seconds. + cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new CreatedExpiryPolicy(new Duration(SECONDS, 1)))); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d49f9c88/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java new file mode 100644 index 0000000..47be047 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.streaming.numbers; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.examples.java8.*; + +import java.util.*; + +/** + * Real time popular numbers counter. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will + * start node with {@code examples/config/example-compute.xml} configuration. + */ +public class QueryPopularNumbers { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { + // Start new cache or get existing one. + try (IgniteCache<Integer, Long> stmCache = ignite.createCache(CacheConfig.configure())) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + while (true) { + // Select top 10 words. + SqlFieldsQuery top10 = new SqlFieldsQuery( + "select _key, _val from Long order by _val desc limit 10"); + + // Execute query. + List<List<?>> results = stmCache.queryFields(top10).getAll(); + + ExamplesUtils.printQueryResults(results); + + Thread.sleep(5000); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d49f9c88/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java new file mode 100644 index 0000000..96472a3 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.streaming.numbers; + +import org.apache.ignite.*; +import org.apache.ignite.examples.java8.*; +import org.apache.ignite.stream.*; + +import java.util.*; + +/** + * Real time popular numbers counter. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will + * start node with {@code examples/config/example-compute.xml} configuration. + */ +public class StreamRandomNumbers { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { + // Create new cache or get existing one. + try (IgniteCache<Integer, Long> stmCache = ignite.createCache(CacheConfig.configure())) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Configure data transformation to count instances of the same word. + stmr.receiver(new StreamTransformer<>((e, arg) -> { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + })); + + + // Stream random numbers into the streamer cache. + while (true) + stmr.addData(RAND.nextInt(RANGE), 1L); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d49f9c88/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java index 4ff9a59..ea95bf1 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java @@ -24,11 +24,18 @@ import javax.cache.processor.*; import java.util.*; /** - * Created by Dmitriy on 3/18/15. + * Convenience adapter to transform update existing values in streaming cache + * based on the previously cached value. */ public class StreamTransformer<K, V> implements StreamReceiver<K, V> { + /** Entry processor. */ private EntryProcessor<K, V, Object> ep; + /** + * Entry processor to update cache values based on the previously cached value. + * + * @param ep Entry processor. + */ public StreamTransformer(CacheEntryProcessor<K, V, Object> ep) { this.ep = ep; }