Repository: incubator-ignite Updated Branches: refs/heads/ignite-sprint-3 cf6e1afa9 -> 060ccf201
# sprint-3 - Updated streaming examples. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/060ccf20 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/060ccf20 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/060ccf20 Branch: refs/heads/ignite-sprint-3 Commit: 060ccf2018f04fb3b51d4154c8658eda020c5639 Parents: cf6e1af Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Mon Apr 6 23:25:02 2015 -0700 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Mon Apr 6 23:25:02 2015 -0700 ---------------------------------------------------------------------- .../streaming/StreamTransformerExample.java | 101 +++++++++++ .../streaming/StreamVisitorExample.java | 176 +++++++++++++++++++ .../transformers/StreamTransformerExample.java | 101 ----------- .../streaming/transformers/package-info.java | 22 --- .../visitors/StreamVisitorExample.java | 176 ------------------- .../streaming/visitors/package-info.java | 22 --- .../streaming/StreamTransformerExample.java | 97 ++++++++++ .../java8/streaming/StreamVisitorExample.java | 173 ++++++++++++++++++ .../transformers/StreamTransformerExample.java | 97 ---------- .../streaming/transformers/package-info.java | 22 --- .../visitors/StreamVisitorExample.java | 173 ------------------ .../java8/streaming/visitors/package-info.java | 22 --- 12 files changed, 547 insertions(+), 635 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/060ccf20/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java new file mode 100644 index 0000000..d5f619b --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.stream.*; + +import javax.cache.processor.*; +import java.util.*; + +/** + * Stream random numbers into the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamTransformerExample}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class StreamTransformerExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; + + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("randomNumbers"); + + // Index key and value. + cfg.setIndexedTypes(Integer.class, Long.class); + + // Auto-close cache at the end of the example. + try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg)) { + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Configure data transformation to count instances of the same number. + stmr.receiver(new StreamTransformer<Integer, Long>() { + @Override + public Object process(MutableEntry<Integer, Long> e, Object... args) { + // Get current count. + Long val = e.getValue(); + + // Increment count by 1. + e.setValue(val == null ? 1L : val + 1); + + return null; + } + }); + + // Stream 10 million of random numbers into the streamer cache. + for (int i = 1; i <= 10_000_000; i++) { + stmr.addData(RAND.nextInt(RANGE), 1L); + + if (i % 500_000 == 0) + System.out.println("Number of tuples streamed into Ignite: " + i); + } + } + + // Query top 10 most popular numbers every. + SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10"); + + // Execute queries. + List<List<?>> top10 = stmCache.query(top10Qry).getAll(); + + System.out.println("Top 10 most popular numbers:"); + + // Print top 10 words. + ExamplesUtils.printQueryResults(top10); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/060ccf20/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java new file mode 100644 index 0000000..47e5538 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.stream.*; + +import java.io.*; +import java.util.*; + +/** + * Stream random numbers into the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamVisitorExample}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class StreamVisitorExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** The list of instruments. */ + private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"}; + + /** The list of initial instrument prices. */ + private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50}; + + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // Market data cache with default configuration. + CacheConfiguration<String, Double> mktDataCfg = new CacheConfiguration<String, Double>("marketTicks"); + + // Financial instrument cache configuration. + CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>("instCache"); + + // Index key and value for querying financial instruments. + // Note that Instrument class has @QuerySqlField annotation for secondary field indexing. + instCfg.setIndexedTypes(String.class, Instrument.class); + + // Auto-close caches at the end of the example. + try ( + IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(mktDataCfg); + IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg) + ) { + try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) { + // Note that we receive market data, but do not populate 'mktCache' (it remains empty). + // Instead we update the instruments in the 'instCache'. + // Since both, 'instCache' and 'mktCache' use the same key, updates are collocated. + mktStmr.receiver(new StreamVisitor<String, Double>() { + @Override + public void apply(IgniteCache<String, Double> cache, Map.Entry<String, Double> e) { + String symbol = e.getKey(); + Double tick = e.getValue(); + + Instrument inst = instCache.get(symbol); + + if (inst == null) + inst = new Instrument(symbol); + + // Don't populate market cache, as we don't use it for querying. + // Update cached instrument based on the latest market tick. + inst.update(tick); + + instCache.put(symbol, inst); + } + }); + + // Stream 10 million market data ticks into the system. + for (int i = 1; i <= 10_000_000; i++) { + int idx = RAND.nextInt(INSTRUMENTS.length); + + // Use gaussian distribution to ensure that + // numbers closer to 0 have higher probability. + double price = round2(INITIAL_PRICES[idx] + RAND.nextGaussian()); + + mktStmr.addData(INSTRUMENTS[idx], price); + + if (i % 500_000 == 0) + System.out.println("Number of tuples streamed into Ignite: " + i); + } + } + + // Select top 3 best performing instruments. + SqlFieldsQuery top3qry = new SqlFieldsQuery( + "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3"); + + // Execute queries. + List<List<?>> top3 = instCache.query(top3qry).getAll(); + + System.out.println("Top performing financial instruments: "); + + // Print top 10 words. + ExamplesUtils.printQueryResults(top3); + } + } + } + + /** + * Rounds double value to two significant signs. + * + * @param val value to be rounded. + * @return rounded double value. + */ + private static double round2(double val) { + return Math.floor(100 * val + 0.5) / 100; + } + + /** + * Financial instrument. + */ + public static class Instrument implements Serializable { + /** Instrument symbol. */ + @QuerySqlField(index = true) + private final String symbol; + + /** Open price. */ + @QuerySqlField(index = true) + private double open; + + /** Close price. */ + @QuerySqlField(index = true) + private double latest; + + /** + * @param symbol Symbol. + */ + Instrument(String symbol) { + this.symbol = symbol; + } + + /** + * Updates this instrument based on the latest price. + * + * @param price Latest price. + */ + public void update(double price) { + if (open == 0) + open = price; + + this.latest = price; + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return "Instrument [symbol=" + symbol + ", latest=" + latest + ", change=" + (latest - open) + ']'; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/060ccf20/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/StreamTransformerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/StreamTransformerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/StreamTransformerExample.java deleted file mode 100644 index b2fc89d..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/StreamTransformerExample.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming.transformers; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.stream.*; - -import javax.cache.processor.*; -import java.util.*; - -/** - * Stream random numbers into the streaming cache. - * To start the example, you should: - * <ul> - * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> - * <li>Start streaming using {@link StreamTransformerExample}.</li> - * </ul> - * <p> - * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class StreamTransformerExample { - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** Range within which to generate numbers. */ - private static final int RANGE = 1000; - - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("randomNumbers"); - - // Index key and value. - cfg.setIndexedTypes(Integer.class, Long.class); - - // Auto-close cache at the end of the example. - try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg)) { - try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { - // Allow data updates. - stmr.allowOverwrite(true); - - // Configure data transformation to count instances of the same number. - stmr.receiver(new StreamTransformer<Integer, Long>() { - @Override - public Object process(MutableEntry<Integer, Long> e, Object... args) { - // Get current count. - Long val = e.getValue(); - - // Increment count by 1. - e.setValue(val == null ? 1L : val + 1); - - return null; - } - }); - - // Stream 10 million of random numbers into the streamer cache. - for (int i = 1; i <= 10_000_000; i++) { - stmr.addData(RAND.nextInt(RANGE), 1L); - - if (i % 500_000 == 0) - System.out.println("Number of tuples streamed into Ignite: " + i); - } - } - - // Query top 10 most popular numbers every. - SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10"); - - // Execute queries. - List<List<?>> top10 = stmCache.query(top10Qry).getAll(); - - System.out.println("Top 10 most popular numbers:"); - - // Print top 10 words. - ExamplesUtils.printQueryResults(top10); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/060ccf20/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/package-info.java deleted file mode 100644 index 8160531..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/transformers/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <!-- Package description. --> - * Streaming data transformation example. - */ -package org.apache.ignite.examples.streaming.transformers; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/060ccf20/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/StreamVisitorExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/StreamVisitorExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/StreamVisitorExample.java deleted file mode 100644 index cd973ea..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/StreamVisitorExample.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming.visitors; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.cache.query.annotations.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.stream.*; - -import java.io.*; -import java.util.*; - -/** - * Stream random numbers into the streaming cache. - * To start the example, you should: - * <ul> - * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> - * <li>Start streaming using {@link StreamVisitorExample}.</li> - * </ul> - * <p> - * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class StreamVisitorExample { - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** The list of instruments. */ - private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"}; - - /** The list of initial instrument prices. */ - private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50}; - - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - // Market data cache with default configuration. - CacheConfiguration<String, Double> mktDataCfg = new CacheConfiguration<String, Double>("marketTicks"); - - // Financial instrument cache configuration. - CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>("instCache"); - - // Index key and value for querying financial instruments. - // Note that Instrument class has @QuerySqlField annotation for secondary field indexing. - instCfg.setIndexedTypes(String.class, Instrument.class); - - // Auto-close caches at the end of the example. - try ( - IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(mktDataCfg); - IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg) - ) { - try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) { - // Note that we receive market data, but do not populate 'mktCache' (it remains empty). - // Instead we update the instruments in the 'instCache'. - // Since both, 'instCache' and 'mktCache' use the same key, updates are collocated. - mktStmr.receiver(new StreamVisitor<String, Double>() { - @Override - public void apply(IgniteCache<String, Double> cache, Map.Entry<String, Double> e) { - String symbol = e.getKey(); - Double tick = e.getValue(); - - Instrument inst = instCache.get(symbol); - - if (inst == null) - inst = new Instrument(symbol); - - // Don't populate market cache, as we don't use it for querying. - // Update cached instrument based on the latest market tick. - inst.update(tick); - - instCache.put(symbol, inst); - } - }); - - // Stream 10 million market data ticks into the system. - for (int i = 1; i <= 10_000_000; i++) { - int idx = RAND.nextInt(INSTRUMENTS.length); - - // Use gaussian distribution to ensure that - // numbers closer to 0 have higher probability. - double price = round2(INITIAL_PRICES[idx] + RAND.nextGaussian()); - - mktStmr.addData(INSTRUMENTS[idx], price); - - if (i % 500_000 == 0) - System.out.println("Number of tuples streamed into Ignite: " + i); - } - } - - // Select top 3 best performing instruments. - SqlFieldsQuery top3qry = new SqlFieldsQuery( - "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3"); - - // Execute queries. - List<List<?>> top3 = instCache.query(top3qry).getAll(); - - System.out.println("Top performing financial instruments: "); - - // Print top 10 words. - ExamplesUtils.printQueryResults(top3); - } - } - } - - /** - * Rounds double value to two significant signs. - * - * @param val value to be rounded. - * @return rounded double value. - */ - private static double round2(double val) { - return Math.floor(100 * val + 0.5) / 100; - } - - /** - * Financial instrument. - */ - public static class Instrument implements Serializable { - /** Instrument symbol. */ - @QuerySqlField(index = true) - private final String symbol; - - /** Open price. */ - @QuerySqlField(index = true) - private double open; - - /** Close price. */ - @QuerySqlField(index = true) - private double latest; - - /** - * @param symbol Symbol. - */ - Instrument(String symbol) { - this.symbol = symbol; - } - - /** - * Updates this instrument based on the latest price. - * - * @param price Latest price. - */ - public void update(double price) { - if (open == 0) - open = price; - - this.latest = price; - } - - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return "Instrument [symbol=" + symbol + ", latest=" + latest + ", change=" + (latest - open) + ']'; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/060ccf20/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/package-info.java deleted file mode 100644 index 9c158e0..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/visitors/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <!-- Package description. --> - * Streaming data visiting example. - */ -package org.apache.ignite.examples.streaming.visitors; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/060ccf20/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamTransformerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamTransformerExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamTransformerExample.java new file mode 100644 index 0000000..fa13d20 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamTransformerExample.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.stream.*; + +import java.util.*; + +/** + * Stream random numbers into the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamTransformerExample}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class StreamTransformerExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; + + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("randomNumbers"); + + // Index key and value. + cfg.setIndexedTypes(Integer.class, Long.class); + + // Auto-close cache at the end of the example. + try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg)) { + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Configure data transformation to count instances of the same number. + stmr.receiver(StreamTransformer.from((e, arg) -> { + // Get current count. + Long val = e.getValue(); + + // Increment count by 1. + e.setValue(val == null ? 1L : val + 1); + + return null; + })); + + // Stream 10 million of random numbers into the streamer cache. + for (int i = 1; i <= 10_000_000; i++) { + stmr.addData(RAND.nextInt(RANGE), 1L); + + if (i % 500_000 == 0) + System.out.println("Number of tuples streamed into Ignite: " + i); + } + } + + // Query top 10 most popular numbers every. + SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10"); + + // Execute queries. + List<List<?>> top10 = stmCache.query(top10Qry).getAll(); + + System.out.println("Top 10 most popular numbers:"); + + // Print top 10 words. + ExamplesUtils.printQueryResults(top10); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/060ccf20/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamVisitorExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamVisitorExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamVisitorExample.java new file mode 100644 index 0000000..2fa7f16 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamVisitorExample.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.stream.*; + +import java.io.*; +import java.util.*; + +/** + * Stream random numbers into the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link StreamVisitorExample}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class StreamVisitorExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** The list of instruments. */ + private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"}; + + /** The list of initial instrument prices. */ + private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50}; + + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // Market data cache with default configuration. + CacheConfiguration<String, Double> mktDataCfg = new CacheConfiguration<String, Double>("marketTicks"); + + // Financial instrument cache configuration. + CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>("instCache"); + + // Index key and value for querying financial instruments. + // Note that Instrument class has @QuerySqlField annotation for secondary field indexing. + instCfg.setIndexedTypes(String.class, Instrument.class); + + // Auto-close caches at the end of the example. + try ( + IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(mktDataCfg); + IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg) + ) { + try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) { + // Note that we receive market data, but do not populate 'mktCache' (it remains empty). + // Instead we update the instruments in the 'instCache'. + // Since both, 'instCache' and 'mktCache' use the same key, updates are collocated. + mktStmr.receiver(StreamVisitor.from((cache, e) -> { + String symbol = e.getKey(); + Double tick = e.getValue(); + + Instrument inst = instCache.get(symbol); + + if (inst == null) + inst = new Instrument(symbol); + + // Don't populate market cache, as we don't use it for querying. + // Update cached instrument based on the latest market tick. + inst.update(tick); + + instCache.put(symbol, inst); + })); + + // Stream 10 million market data ticks into the system. + for (int i = 1; i <= 10_000_000; i++) { + int idx = RAND.nextInt(INSTRUMENTS.length); + + // Use gaussian distribution to ensure that + // numbers closer to 0 have higher probability. + double price = round2(INITIAL_PRICES[idx] + RAND.nextGaussian()); + + mktStmr.addData(INSTRUMENTS[idx], price); + + if (i % 500_000 == 0) + System.out.println("Number of tuples streamed into Ignite: " + i); + } + } + + // Select top 3 best performing instruments. + SqlFieldsQuery top3qry = new SqlFieldsQuery( + "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3"); + + // Execute queries. + List<List<?>> top3 = instCache.query(top3qry).getAll(); + + System.out.println("Top performing financial instruments: "); + + // Print top 10 words. + ExamplesUtils.printQueryResults(top3); + } + } + } + + /** + * Rounds double value to two significant signs. + * + * @param val value to be rounded. + * @return rounded double value. + */ + private static double round2(double val) { + return Math.floor(100 * val + 0.5) / 100; + } + + /** + * Financial instrument. + */ + public static class Instrument implements Serializable { + /** Instrument symbol. */ + @QuerySqlField(index = true) + private final String symbol; + + /** Open price. */ + @QuerySqlField(index = true) + private double open; + + /** Close price. */ + @QuerySqlField(index = true) + private double latest; + + /** + * @param symbol Symbol. + */ + Instrument(String symbol) { + this.symbol = symbol; + } + + /** + * Updates this instrument based on the latest price. + * + * @param price Latest price. + */ + public void update(double price) { + if (open == 0) + open = price; + + this.latest = price; + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return "Instrument [symbol=" + symbol + ", latest=" + latest + ", change=" + (latest - open) + ']'; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/060ccf20/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/StreamTransformerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/StreamTransformerExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/StreamTransformerExample.java deleted file mode 100644 index 4ea1245..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/StreamTransformerExample.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.java8.streaming.transformers; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.stream.*; - -import java.util.*; - -/** - * Stream random numbers into the streaming cache. - * To start the example, you should: - * <ul> - * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> - * <li>Start streaming using {@link StreamTransformerExample}.</li> - * </ul> - * <p> - * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class StreamTransformerExample { - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** Range within which to generate numbers. */ - private static final int RANGE = 1000; - - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("randomNumbers"); - - // Index key and value. - cfg.setIndexedTypes(Integer.class, Long.class); - - // Auto-close cache at the end of the example. - try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg)) { - try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { - // Allow data updates. - stmr.allowOverwrite(true); - - // Configure data transformation to count instances of the same number. - stmr.receiver(StreamTransformer.from((e, arg) -> { - // Get current count. - Long val = e.getValue(); - - // Increment count by 1. - e.setValue(val == null ? 1L : val + 1); - - return null; - })); - - // Stream 10 million of random numbers into the streamer cache. - for (int i = 1; i <= 10_000_000; i++) { - stmr.addData(RAND.nextInt(RANGE), 1L); - - if (i % 500_000 == 0) - System.out.println("Number of tuples streamed into Ignite: " + i); - } - } - - // Query top 10 most popular numbers every. - SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10"); - - // Execute queries. - List<List<?>> top10 = stmCache.query(top10Qry).getAll(); - - System.out.println("Top 10 most popular numbers:"); - - // Print top 10 words. - ExamplesUtils.printQueryResults(top10); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/060ccf20/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/package-info.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/package-info.java deleted file mode 100644 index 7b37ca7..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <!-- Package description. --> - * Streaming data transformation example. - */ -package org.apache.ignite.examples.java8.streaming.transformers; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/060ccf20/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/StreamVisitorExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/StreamVisitorExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/StreamVisitorExample.java deleted file mode 100644 index 009a7c3..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/StreamVisitorExample.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.java8.streaming.visitors; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.cache.query.annotations.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.stream.*; - -import java.io.*; -import java.util.*; - -/** - * Stream random numbers into the streaming cache. - * To start the example, you should: - * <ul> - * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> - * <li>Start streaming using {@link StreamVisitorExample}.</li> - * </ul> - * <p> - * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class StreamVisitorExample { - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** The list of instruments. */ - private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"}; - - /** The list of initial instrument prices. */ - private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50}; - - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - // Market data cache with default configuration. - CacheConfiguration<String, Double> mktDataCfg = new CacheConfiguration<String, Double>("marketTicks"); - - // Financial instrument cache configuration. - CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>("instCache"); - - // Index key and value for querying financial instruments. - // Note that Instrument class has @QuerySqlField annotation for secondary field indexing. - instCfg.setIndexedTypes(String.class, Instrument.class); - - // Auto-close caches at the end of the example. - try ( - IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(mktDataCfg); - IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg) - ) { - try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) { - // Note that we receive market data, but do not populate 'mktCache' (it remains empty). - // Instead we update the instruments in the 'instCache'. - // Since both, 'instCache' and 'mktCache' use the same key, updates are collocated. - mktStmr.receiver(StreamVisitor.from((cache, e) -> { - String symbol = e.getKey(); - Double tick = e.getValue(); - - Instrument inst = instCache.get(symbol); - - if (inst == null) - inst = new Instrument(symbol); - - // Don't populate market cache, as we don't use it for querying. - // Update cached instrument based on the latest market tick. - inst.update(tick); - - instCache.put(symbol, inst); - })); - - // Stream 10 million market data ticks into the system. - for (int i = 1; i <= 10_000_000; i++) { - int idx = RAND.nextInt(INSTRUMENTS.length); - - // Use gaussian distribution to ensure that - // numbers closer to 0 have higher probability. - double price = round2(INITIAL_PRICES[idx] + RAND.nextGaussian()); - - mktStmr.addData(INSTRUMENTS[idx], price); - - if (i % 500_000 == 0) - System.out.println("Number of tuples streamed into Ignite: " + i); - } - } - - // Select top 3 best performing instruments. - SqlFieldsQuery top3qry = new SqlFieldsQuery( - "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3"); - - // Execute queries. - List<List<?>> top3 = instCache.query(top3qry).getAll(); - - System.out.println("Top performing financial instruments: "); - - // Print top 10 words. - ExamplesUtils.printQueryResults(top3); - } - } - } - - /** - * Rounds double value to two significant signs. - * - * @param val value to be rounded. - * @return rounded double value. - */ - private static double round2(double val) { - return Math.floor(100 * val + 0.5) / 100; - } - - /** - * Financial instrument. - */ - public static class Instrument implements Serializable { - /** Instrument symbol. */ - @QuerySqlField(index = true) - private final String symbol; - - /** Open price. */ - @QuerySqlField(index = true) - private double open; - - /** Close price. */ - @QuerySqlField(index = true) - private double latest; - - /** - * @param symbol Symbol. - */ - Instrument(String symbol) { - this.symbol = symbol; - } - - /** - * Updates this instrument based on the latest price. - * - * @param price Latest price. - */ - public void update(double price) { - if (open == 0) - open = price; - - this.latest = price; - } - - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return "Instrument [symbol=" + symbol + ", latest=" + latest + ", change=" + (latest - open) + ']'; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/060ccf20/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/package-info.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/package-info.java deleted file mode 100644 index 806f0a1..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <!-- Package description. --> - * Streaming data visiting example. - */ -package org.apache.ignite.examples.java8.streaming.visitors;