http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingCheckInExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingCheckInExample.java b/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingCheckInExample.java new file mode 100644 index 0000000..7f9959c --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingCheckInExample.java @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.examples8.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.streamer.*; +import org.apache.ignite.streamer.index.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * This streamer example is inspired by <a href="https://foursquare.com/">Foursquare</a> + * project. It shows the usage of window indexes, and, particularly, + * unique index, which allows to skip some duplicated events based on a certain + * criteria. + * <p> + * In this example we have a number of places with 2D locations, and a number + * of users who perform check-ins in random locations from time to time. + * A check-in is a streamer event, which is processed through the pipeline + * of 2 stages and added sequentially to two windows, both of which hold entries + * within past 10 seconds (the rest of the entries are evicted). + * <p> + * First stage simply ensures the user does not check-in twice within the + * 10 second time interval. This check is done by using a unique window hash index + * with user name as a key. In case of a duplicate, an error is handled and + * reported. + * <p> + * Second stage checks if a user has checked-in in one of the tracked places. + * If that's the case, then an info entry is added to a second window. Again, + * it is only valid for 10 seconds, and is evicted afterwards. + * <p> + * There is a separate timer task, which polls a second window index and displays + * the users that have checked-in in the known places within the last 10 seconds. + * <p> + * Remote nodes should always be started with special configuration file: + * {@code 'ignite.{sh|bat} examples/config/example-streamer.xml'}. + * When starting nodes this way JAR file containing the examples code + * should be placed to {@code IGNITE_HOME/libs} folder. You can build + * {@code ignite-examples.jar} by running {@code mvn package} in + * {@code IGNITE_HOME/examples} folder. After that {@code ignite-examples.jar} + * will be generated by Maven in {@code IGNITE_HOME/examples/target} folder. + * <p> + * Alternatively you can run {@link org.apache.ignite.examples8.streaming.StreamingNodeStartup} in another JVM which will start node + * with {@code examples/config/example-streamer.xml} configuration. + */ +public class StreamingCheckInExample { + /** Streamer name. */ + private static final String STREAMER_NAME = "check-in"; + + /** + * Nearby distance. Locations with distance less than or equal + * to this one are considered to be nearby. + */ + private static final double NEARBY_DISTANCE = 5.0d; + + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Total number of events to generate. */ + private static final int CNT = 60; + + /** User names. */ + private static final String[] USER_NAMES = { + "Alice", "Bob", "Ann", "Joe", "Mary", "Peter", "Lisa", "Tom", "Kate", "Sam" + }; + + /** Places, for which to track user check-ins. */ + private static final Place[] TRACKED_PLACES = { + new Place("Theatre", new Location(1.234, 2.567)), + new Place("Bowling", new Location(10.111, 5.213)), + new Place("Bar", new Location(15.199, 16.781)), + new Place("Cinema", new Location(3.77, 20.239)) + }; + + /** Max X coordinate. */ + private static final int MAX_X = 30; + + /** Max Y coordinate. */ + private static final int MAX_Y = 30; + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws org.apache.ignite.IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + Timer timer = new Timer("check-in-query-worker"); + + // Start ignite. + final Ignite ignite = Ignition.start("examples/config/example-streamer.xml"); + + System.out.println(); + System.out.println(">>> Streaming check-in example started."); + + try { + // Get the streamer. + IgniteStreamer streamer = ignite.streamer(STREAMER_NAME); + + assert streamer != null; + + // Add a failure listener. + streamer.addStreamerFailureListener((stageName, evts, err) -> System.err.println("Failure [stage=" + stageName + ", evts=" + evts + ", err=" + err.getMessage())); + + // Periodically display users, who have checked-in in known places. + scheduleQuery(streamer, timer); + + // Stream the check-in events. + streamData(streamer); + + timer.cancel(); + + // Reset all streamers on all nodes to make sure that + // consecutive executions start from scratch. + ignite.compute().broadcast(() -> { + if (!ExamplesUtils.hasStreamer(ignite, STREAMER_NAME)) + System.err.println("Default streamer not found (is example-streamer.xml " + + "configuration used on all nodes?)"); + else { + IgniteStreamer streamer1 = ignite.streamer(STREAMER_NAME); + + System.out.println("Clearing streamer data."); + + streamer1.reset(); + } + }); + } + finally { + Ignition.stop(true); + } + } + + /** + * Schedules the query to periodically output the users, who have + * checked-in in tracked places. + * + * @param streamer Streamer. + * @param timer Timer. + */ + private static void scheduleQuery(final IgniteStreamer streamer, Timer timer) { + TimerTask task = new TimerTask() { + @Override public void run() { + try { + // Send reduce query to all streamers running on local and remote noes. + Map<String, Place> userPlaces = streamer.context().reduce( + // This closure will execute on remote nodes. + ctx -> { + StreamerWindow<LocationInfo> win = + ctx.window(DetectPlacesStage.class.getSimpleName()); + + assert win != null; + + StreamerIndex<LocationInfo, String, Place> idxView = win.index(); + + Collection<StreamerIndexEntry<LocationInfo, String, Place>> entries = + idxView.entries(0); + + Map<String, Place> ret = new HashMap<>(entries.size(), 1.0f); + + for (StreamerIndexEntry<LocationInfo, String, Place> e : entries) + ret.put(e.key(), e.value()); + + return ret; + }, + new IgniteReducer<Map<String, Place>, Map<String, Place>>() { + private Map<String, Place> map; + + @Override public boolean collect(@Nullable Map<String, Place> m) { + if (m == null) + return false; + + if (map != null) + map.putAll(m); + else + map = m; + + return true; + } + + @Override public Map<String, Place> reduce() { + return map; + } + } + ); + + StringBuilder sb = new StringBuilder("----------------\n"); + + for (Map.Entry<String, Place> userPlace : userPlaces.entrySet()) + sb.append(String.format("%s is at the %s (%s)\n", userPlace.getKey(), + userPlace.getValue().name(), userPlace.getValue().location())); + + sb.append("----------------\n"); + + System.out.print(sb.toString()); + } + catch (IgniteException e) { + e.printStackTrace(); + } + } + }; + + // Run task every 3 seconds. + timer.schedule(task, 3000, 3000); + } + + /** + * Streams check-in events into the system. + * + * @param streamer Streamer. + * @throws org.apache.ignite.IgniteException If failed. + */ + @SuppressWarnings("BusyWait") + private static void streamData(IgniteStreamer streamer) throws IgniteException { + try { + for (int i = 0; i < CNT; i++) { + CheckInEvent evt = new CheckInEvent( + USER_NAMES[ThreadLocalRandom.current().nextInt(USER_NAMES.length)], + new Location( + RAND.nextDouble() + RAND.nextInt(MAX_X - 1), + RAND.nextDouble() + RAND.nextInt(MAX_Y)) + ); + + System.out.println(">>> Generating event: " + evt); + + streamer.addEvent(evt); + + Thread.sleep(1000); + } + } + catch (InterruptedException ignored) { + // No-op. + } + } + + /** + * Entity class that represents a 2D location. + */ + private static class Location { + /** Check-in location on X axis (longitude). */ + private final double x; + + /** Check-in location on Y axis (latitude). */ + private final double y; + + /** + * @param x X value. + * @param y Y value. + */ + Location(double x, double y) { + this.x = x; + this.y = y; + } + + /** + * @return Check-in location on X axis (longitude). + */ + public double x() { + return x; + } + + /** + * @return Check-in location on Y axis (latitude). + */ + public double y() { + return y; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "Location [x=" + x + ", y=" + y + ']'; + } + } + + /** + * Entity class representing a place, where + * users can check-in. + */ + private static class Place { + /** Place name. */ + private final String name; + + /** Location. */ + private final Location location; + + /** + * @param name Name. + * @param location Location. + */ + Place(String name, Location location) { + this.name = name; + this.location = location; + } + + /** + * @return Place name. + */ + public String name() { + return name; + } + + /** + * @return Location. + */ + public Location location() { + return location; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "Place [name=" + name + ", location=" + location + ']'; + } + } + + /** + * Check-in event. + */ + private static class CheckInEvent { + /** User name. */ + private final String userName; + + /** User location. */ + private final Location location; + + /** + * @param userName User name. + * @param location Location. + */ + CheckInEvent(String userName, Location location) { + this.userName = userName; + this.location = location; + } + + /** + * @return User name. + */ + public String userName() { + return userName; + } + + /** + * @return User location. + */ + public Location location() { + return location; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CheckInEvent [userName=" + userName + ", location=" + location + ']'; + } + } + + /** + * Helper data structure for keeping information about + * check-in location and a corresponding place if found. + */ + private static class LocationInfo { + /** User name. */ + private final String userName; + + /** A detected check-in place. */ + private final Place place; + + /** + * @param userName User name. + * @param place Place. + */ + LocationInfo(String userName, Place place) { + this.userName = userName; + this.place = place; + } + + /** + * @return User name. + */ + public String userName() { + return userName; + } + + /** + * @return A detected check-in place. + */ + public Place place() { + return place; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "LocationInfo [userName=" + userName + ", place=" + place + ']'; + } + } + + /** + * Check-in event processing stage that adds events window + * with unique index to block repetitive check-ins. + */ + @SuppressWarnings("PublicInnerClass") + public static class AddToWindowStage implements StreamerStage<CheckInEvent> { + /** {@inheritDoc} */ + @Override public String name() { + return getClass().getSimpleName(); + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<String, Collection<?>> run( + StreamerContext ctx, Collection<CheckInEvent> evts) { + StreamerWindow<CheckInEvent> win = ctx.window(name()); + + assert win != null; + + Collection<CheckInEvent> evts0 = new LinkedList<>(); + + // Add events to window. Our unique index should reject + // repetitive check-ins within a period of time, defined + // by the window. + for (CheckInEvent evt : evts) { + try { + win.enqueue(evt); + + evts0.add(evt); + } + catch (IgniteException e) { + if (e.getMessage().contains("Index unique key violation")) + System.err.println("Cannot check-in twice within the specified period of time [evt=" + evt + ']'); + else + throw e; + } + } + + // Clear evicted events. + win.pollEvictedAll(); + + // Move to the next stage in pipeline, if there are valid events. + if (!evts0.isEmpty()) + return Collections.<String, Collection<?>>singletonMap(ctx.nextStageName(), evts0); + + // Break the pipeline execution. + return null; + } + } + + /** + * Check-in event processing stage that detects the + * check-in places. + */ + private static class DetectPlacesStage implements StreamerStage<CheckInEvent> { + /** {@inheritDoc} */ + @Override public String name() { + return getClass().getSimpleName(); + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, + Collection<CheckInEvent> evts) { + StreamerWindow<LocationInfo> win = ctx.window(name()); + + assert win != null; + + for (CheckInEvent evt : evts) { + for (Place place : TRACKED_PLACES) { + if (distance(evt.location(), place.location()) <= NEARBY_DISTANCE) { + win.enqueue(new LocationInfo(evt.userName(), place)); + + break; + } + } + } + + // Clear evicted location infos. + win.pollEvictedAll(); + + // Null means there are no more stages and + // we should finish the pipeline. + return null; + } + + /** + * Calculates the distance between 2 locations. + * + * @param loc1 First location. + * @param loc2 Second location. + * @return Distance between locations. + */ + private double distance(Location loc1, Location loc2) { + double xDiff = Math.abs(loc1.x() - loc2.x()); + double yDiff = Math.abs(loc1.y() - loc2.y()); + + // Return a vector distance between the points. + return Math.sqrt(xDiff * xDiff + yDiff * yDiff); + } + } + + /** + * Index updater for check-in events. Updaters are specified for {@link org.apache.ignite.streamer.index.StreamerIndexProviderAdapter} in + * streamer configuration. + */ + private static class CheckInEventIndexUpdater implements StreamerIndexUpdater<CheckInEvent, String, Location> { + /** {@inheritDoc} */ + @Nullable @Override public String indexKey(CheckInEvent evt) { + return evt.userName(); // Index key is an event user name. + } + + /** {@inheritDoc} */ + @Nullable @Override public Location initialValue(CheckInEvent evt, String key) { + return evt.location(); // Index value is an event location. + } + + /** {@inheritDoc} */ + @Nullable @Override public Location onAdded( + StreamerIndexEntry<CheckInEvent, String, Location> entry, + CheckInEvent evt) { + throw new AssertionError("onAdded() shouldn't be called on unique index."); + } + + /** {@inheritDoc} */ + @Nullable @Override public Location onRemoved( + StreamerIndexEntry<CheckInEvent, String, Location> entry, + CheckInEvent evt) { + return null; + } + } + + /** + * Index updater for location info. Updaters are specified for {@link org.apache.ignite.streamer.index.StreamerIndexProviderAdapter} in + * streamer configuration. + */ + private static class PlacesIndexUpdater implements StreamerIndexUpdater<LocationInfo, String, Place> { + /** {@inheritDoc} */ + @Nullable @Override public String indexKey(LocationInfo info) { + return info.userName(); + } + + /** {@inheritDoc} */ + @Nullable @Override public Place initialValue(LocationInfo info, String key) { + return info.place(); + } + + /** {@inheritDoc} */ + @Nullable @Override public Place onAdded( + StreamerIndexEntry<LocationInfo, String, Place> entry, + LocationInfo evt) { + throw new AssertionError("onAdded() shouldn't be called on unique index."); + } + + /** {@inheritDoc} */ + @Nullable @Override public Place onRemoved( + StreamerIndexEntry<LocationInfo, String, Place> entry, + LocationInfo evt) { + return null; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingNodeStartup.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingNodeStartup.java b/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingNodeStartup.java new file mode 100644 index 0000000..60ed1cd --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingNodeStartup.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8.streaming; + +import org.apache.ignite.*; + +/** + * Starts up node with example streamer configuration. + */ +public class StreamingNodeStartup { + /** + * Start up an empty node with specified cache configuration. + * + * @param args Command line arguments, none required. + * @throws org.apache.ignite.IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + Ignition.start("examples/config/example-streamer.xml"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingPopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingPopularNumbersExample.java b/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingPopularNumbersExample.java new file mode 100644 index 0000000..75f0956 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingPopularNumbersExample.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.examples8.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.streamer.*; +import org.apache.ignite.streamer.index.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Real time streaming popular numbers counter. This example receives a constant stream of + * random numbers. The gaussian distribution is chosen to make sure that numbers closer + * to 0 have higher probability. This example will find {@link #POPULAR_NUMBERS_CNT} number + * of popular numbers over last N number of numbers, where N is specified as streamer + * window size in {@code examples/config/example-streamer.xml} configuration file and + * is set to {@code 10,000}. + * <p> + * Remote nodes should always be started with special configuration file: + * {@code 'ignite.{sh|bat} examples/config/example-streamer.xml'}. + * When starting nodes this way JAR file containing the examples code + * should be placed to {@code IGNITE_HOME/libs} folder. You can build + * {@code ignite-examples.jar} by running {@code mvn package} in + * {@code IGNITE_HOME/examples} folder. After that {@code ignite-examples.jar} + * will be generated by Maven in {@code IGNITE_HOME/examples/target} folder. + * <p> + * Alternatively you can run {@link org.apache.ignite.examples8.streaming.StreamingNodeStartup} in another JVM which will start node + * with {@code examples/config/example-streamer.xml} configuration. + */ +public class StreamingPopularNumbersExample { + /** Count of most popular numbers to retrieve from ignite. */ + private static final int POPULAR_NUMBERS_CNT = 10; + + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Count of total numbers to generate. */ + private static final int CNT = 10_000_000; + + /** Comparator sorting random number entries by number popularity. */ + private static final Comparator<StreamerIndexEntry<Integer, Integer, Long>> CMP = + (e1, e2) -> e2.value().compareTo(e1.value()); + + /** Reducer selecting first POPULAR_NUMBERS_CNT values. */ + private static class PopularNumbersReducer implements IgniteReducer<Collection<StreamerIndexEntry<Integer, Integer, Long>>, + Collection<StreamerIndexEntry<Integer, Integer, Long>>> { + /** */ + private final List<StreamerIndexEntry<Integer, Integer, Long>> sorted = new ArrayList<>(); + + /** {@inheritDoc} */ + @Override public boolean collect(@Nullable Collection<StreamerIndexEntry<Integer, Integer, Long>> col) { + if (col != null && !col.isEmpty()) + // Add result from remote node to sorted set. + sorted.addAll(col); + + return true; + } + + /** {@inheritDoc} */ + @Override public Collection<StreamerIndexEntry<Integer, Integer, Long>> reduce() { + Collections.sort(sorted, CMP); + + return sorted.subList(0, POPULAR_NUMBERS_CNT < sorted.size() ? POPULAR_NUMBERS_CNT : sorted.size()); + } + } + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws org.apache.ignite.IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + Timer popularNumbersQryTimer = new Timer("numbers-query-worker"); + + // Start ignite. + final Ignite ignite = Ignition.start("examples/config/example-streamer.xml"); + + System.out.println(); + System.out.println(">>> Streaming popular numbers example started."); + + try { + // Schedule query to find most popular words to run every 3 seconds. + TimerTask task = scheduleQuery(ignite, popularNumbersQryTimer); + + streamData(ignite); + + // Force one more run to get final counts. + task.run(); + + popularNumbersQryTimer.cancel(); + + // Reset all streamers on all nodes to make sure that + // consecutive executions start from scratch. + ignite.compute().broadcast(() -> { + if (!ExamplesUtils.hasStreamer(ignite, "popular-numbers")) + System.err.println("Default streamer not found (is example-streamer.xml " + + "configuration used on all nodes?)"); + else { + IgniteStreamer streamer = ignite.streamer("popular-numbers"); + + System.out.println("Clearing number counters from streamer."); + + streamer.reset(); + } + }); + } + finally { + Ignition.stop(true); + } + } + + /** + * Streams random numbers into the system. + * + * @param ignite Ignite. + * @throws org.apache.ignite.IgniteException If failed. + */ + private static void streamData(final Ignite ignite) throws IgniteException { + final IgniteStreamer streamer = ignite.streamer("popular-numbers"); + + // Use gaussian distribution to ensure that + // numbers closer to 0 have higher probability. + for (int i = 0; i < CNT; i++) + streamer.addEvent(((Double)(RAND.nextGaussian() * 10)).intValue()); + } + + /** + * Schedules our popular numbers query to run every 3 seconds. + * + * @param ignite Ignite. + * @param timer Timer. + * @return Scheduled task. + */ + private static TimerTask scheduleQuery(final Ignite ignite, Timer timer) { + TimerTask task = new TimerTask() { + @Override public void run() { + final IgniteStreamer streamer = ignite.streamer("popular-numbers"); + + try { + // Send reduce query to all 'popular-numbers' streamers + // running on local and remote nodes. + Collection<StreamerIndexEntry<Integer, Integer, Long>> col = streamer.context().reduce( + // This closure will execute on remote nodes. + ctx -> { + StreamerIndex<Integer, Integer, Long> view = ctx.<Integer>window().index(); + + return view.entries(-1 * POPULAR_NUMBERS_CNT); + }, + // The reducer will always execute locally, on the same node + // that submitted the query. + new PopularNumbersReducer()); + + for (StreamerIndexEntry<Integer, Integer, Long> cntr : col) + System.out.printf("%3d=%d\n", cntr.key(), cntr.value()); + + System.out.println("----------------"); + } + catch (IgniteException e) { + e.printStackTrace(); + } + } + }; + + timer.schedule(task, 3000, 3000); + + return task; + } + + /** + * Sample streamer stage to compute average. + */ + @SuppressWarnings("PublicInnerClass") + public static class StreamerStage implements org.apache.ignite.streamer.StreamerStage<Integer> { + /** {@inheritDoc} */ + @Override public String name() { + return "exampleStage"; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Integer> nums) { + StreamerWindow<Integer> win = ctx.window(); + + // Add numbers to window. + win.enqueueAll(nums); + + // Clear evicted numbers. + win.clearEvicted(); + + // Null means that there are no more stages + // and that stage pipeline is completed. + return null; + } + } + + /** + * This class will be set as part of window index configuration. + */ + private static class IndexUpdater implements StreamerIndexUpdater<Integer, Integer, Long> { + /** {@inheritDoc} */ + @Override public Integer indexKey(Integer evt) { + // We use event as index key, so event and key are the same. + return evt; + } + + /** {@inheritDoc} */ + @Nullable @Override public Long onAdded(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) { + return entry.value() + 1; + } + + /** {@inheritDoc} */ + @Nullable @Override public Long onRemoved(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) { + return entry.value() - 1 == 0 ? null : entry.value() - 1; + } + + /** {@inheritDoc} */ + @Override public Long initialValue(Integer evt, Integer key) { + return 1L; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingPriceBarsExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingPriceBarsExample.java b/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingPriceBarsExample.java new file mode 100644 index 0000000..65ffc30 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingPriceBarsExample.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.examples8.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.streamer.*; +import org.apache.ignite.streamer.router.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Example to demonstrate streamer with multiple stages. This example builds price + * bars which are entities that aggregate prices over some time interval. For each + * interval a price bar holds the following metrics: + * <ul> + * <li>Open price - the first price in the time interval.</li> + * <li>High price - the highest price in the interval.</li> + * <li>Low price - the lowest price in the interval.</li> + * <li>Close prices - the last price in the interval.</li> + * </ul> + * In this example trade quotes for several instruments are streamed into the system. + * Constructing of price bars is performed in two stages. The first stage builds price bars + * for one second intervals, and the second stage used results of the first stage to + * build price bars for every 2 second interval. + * <p> + * Note, the bars in the example are not real charting bars, but rather a simplification + * with purpose to demonstrate multi-stage streaming processing. + * <p> + * Remote nodes should always be started with special configuration file: + * {@code 'ignite.{sh|bat} examples/config/example-streamer.xml'}. + * When starting nodes this way JAR file containing the examples code + * should be placed to {@code IGNITE_HOME/libs} folder. You can build + * {@code ignite-examples.jar} by running {@code mvn package} in + * {@code IGNITE_HOME/examples} folder. After that {@code ignite-examples.jar} + * will be generated by Maven in {@code IGNITE_HOME/examples/target} folder. + * <p> + * Alternatively you can run {@link org.apache.ignite.examples8.streaming.StreamingNodeStartup} in another JVM which will start node + * with {@code examples/config/example-streamer.xml} configuration. + */ +public class StreamingPriceBarsExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Count of total numbers to generate. */ + private static final int CNT = 10000000; + + /** The list of instruments. */ + private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE"}; + + /** The list of initial instrument prices. */ + private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24}; + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws org.apache.ignite.IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + Timer timer = new Timer("priceBars"); + + // Start ignite. + final Ignite ignite = Ignition.start("examples/config/example-streamer.xml"); + + System.out.println(); + System.out.println(">>> Streaming price bars example started."); + + try { + TimerTask task = scheduleQuery(ignite, timer); + + streamData(ignite); + + // Force one more run to get final results. + task.run(); + + timer.cancel(); + + // Reset all streamers on all nodes to make sure that + // consecutive executions start from scratch. + ignite.compute().broadcast(() -> { + if (!ExamplesUtils.hasStreamer(ignite, "priceBars")) + System.err.println("Default streamer not found (is example-streamer.xml " + + "configuration used on all nodes?)"); + else { + IgniteStreamer streamer = ignite.streamer("priceBars"); + + System.out.println("Clearing bars from streamer."); + + streamer.reset(); + } + }); + } + finally { + Ignition.stop(true); + } + } + + /** + * Schedules the query to periodically output built bars to the console. + * + * @param ignite Ignite. + * @param timer Timer. + * @return Scheduled task. + */ + private static TimerTask scheduleQuery(final Ignite ignite, Timer timer) { + TimerTask task = new TimerTask() { + @Override public void run() { + final IgniteStreamer streamer = ignite.streamer("priceBars"); + + try { + Collection<Bar> bars = streamer.context().reduce( + // This closure will execute on remote nodes. + ctx -> { + Collection<Bar> values = ctx.<String, Bar>localSpace().values(); + + Collection<Bar> res = new ArrayList<>(values.size()); + + for (Bar bar : values) + res.add(bar.copy()); + + return res; + }, + // The reducer will always execute locally, on the same node + // that submitted the query. + new IgniteReducer<Collection<Bar>, Collection<Bar>>() { + private final Collection<Bar> res = new ArrayList<>(); + + @Override public boolean collect(@Nullable Collection<Bar> col) { + res.addAll(col); + + return true; + } + + @Override public Collection<Bar> reduce() { + return res; + } + } + ); + + for (Bar bar : bars) + System.out.println(bar.toString()); + + System.out.println("-----------------"); + } + catch (IgniteException e) { + e.printStackTrace(); + } + } + }; + + timer.schedule(task, 2000, 2000); + + return task; + } + + /** + * Streams random prices into the system. + * + * @param ignite Ignite. + * @throws org.apache.ignite.IgniteException If failed. + */ + private static void streamData(final Ignite ignite) throws IgniteException { + IgniteStreamer streamer = ignite.streamer("priceBars"); + + for (int i = 0; i < CNT; i++) { + for (int j = 0; j < INSTRUMENTS.length; j++) { + // Use gaussian distribution to ensure that + // numbers closer to 0 have higher probability. + double price = round2(INITIAL_PRICES[j] + RAND.nextGaussian()); + + Quote quote = new Quote(INSTRUMENTS[j], price); + + streamer.addEvent(quote); + } + } + } + + /** + * Rounds double value to two significant signs. + * + * @param val value to be rounded. + * @return rounded double value. + */ + private static double round2(double val) { + return Math.floor(100 * val + 0.5) / 100; + } + + /** + * Trade quote that is streamed into the system. + */ + private static class Quote implements StreamerAffinityEventRouter.AffinityEvent { + /** Instrument symbol. */ + private final String symbol; + + /** Price. */ + private final double price; + + /** + * @param symbol Symbol. + * @param price Price. + */ + Quote(String symbol, double price) { + this.symbol = symbol; + this.price = price; + } + + /** + * @return Symbol. + */ + public String symbol() { + return symbol; + } + + /** + * @return Price. + */ + public double price() { + return price; + } + + /** {@inheritDoc} */ + @Override public String affinityKey() { + return symbol; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "Quote [symbol=" + symbol + ", price=" + price + ']'; + } + } + + /** + * The bar that is build by the streamer. + */ + private static class Bar implements StreamerAffinityEventRouter.AffinityEvent { + /** Instrument symbol. */ + private final String symbol; + + /** Open price. */ + private volatile double open; + + /** High price. */ + private volatile double high; + + /** Low price. */ + private volatile double low = Long.MAX_VALUE; + + /** Close price. */ + private volatile double close; + + /** + * @param symbol Symbol. + */ + Bar(String symbol) { + this.symbol = symbol; + } + + /** + * @return Copy of this instance. + */ + public synchronized Bar copy() { + Bar res = new Bar(symbol); + + res.open = open; + res.high = high; + res.low = low; + res.close = close; + + return res; + } + + /** + * Updates this bar with last price. + * + * @param price Price. + */ + public synchronized void update(double price) { + if (open == 0) + open = price; + + high = Math.max(high, price); + low = Math.min(low, price); + close = price; + } + + /** + * Updates this bar with next bar. + * + * @param bar Next bar. + */ + public synchronized void update(Bar bar) { + if (open == 0) + open = bar.open; + + high = Math.max(high, bar.high); + low = Math.min(low, bar.low); + close = bar.close; + } + + /** + * @return Symbol. + */ + public String symbol() { + return symbol; + } + + /** + * @return Open price. + */ + public double open() { + return open; + } + + /** + * @return High price. + */ + public double high() { + return high; + } + + /** + * @return Low price. + */ + public double low() { + return low; + } + + /** + * @return Close price. + */ + public double close() { + return close; + } + + /** {@inheritDoc} */ + @Override public String affinityKey() { + return symbol; + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return "Bar [symbol=" + symbol + ", open=" + open + ", high=" + high + ", low=" + low + + ", close=" + close + ']'; + } + } + + /** + * The first stage where 1 second bars are built. + */ + @SuppressWarnings({ "PublicInnerClass", "unchecked" }) + public static class FirstStage implements StreamerStage<Quote> { + /** {@inheritDoc} */ + @Override public String name() { + return getClass().getSimpleName(); + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Quote> quotes) { + StreamerWindow win = ctx.window("stage1"); + + // Add numbers to window. + win.enqueueAll(quotes); + + Collection<Quote> polled = win.pollEvictedBatch(); + + if (!polled.isEmpty()) { + Map<String, Bar> map = new HashMap<>(); + + for (Quote quote : polled) { + String symbol = quote.symbol(); + + Bar bar = map.get(symbol); + + if (bar == null) + map.put(symbol, bar = new Bar(symbol)); + + bar.update(quote.price()); + } + + return Collections.<String, Collection<?>>singletonMap(ctx.nextStageName(), map.values()); + } + + return null; + } + } + + /** + * The second stage where 2 second bars are built. + */ + @SuppressWarnings({ "PublicInnerClass", "unchecked" }) + public static class SecondStage implements StreamerStage<Bar> { + /** {@inheritDoc} */ + @Override public String name() { + return getClass().getSimpleName(); + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Bar> bars) { + ConcurrentMap<String, Bar> loc = ctx.localSpace(); + + StreamerWindow win = ctx.window("stage2"); + + // Add numbers to window. + win.enqueueAll(bars); + + Collection<Bar> polled = win.pollEvictedBatch(); + + if (!polled.isEmpty()) { + Map<String, Bar> map = new HashMap<>(); + + for (Bar polledBar : polled) { + String symbol = polledBar.symbol(); + + Bar bar = map.get(symbol); + + if (bar == null) + map.put(symbol, bar = new Bar(symbol)); + + bar.update(polledBar); + } + + loc.putAll(map); + } + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingRunningAverageExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingRunningAverageExample.java b/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingRunningAverageExample.java new file mode 100644 index 0000000..724c9da --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples8/streaming/StreamingRunningAverageExample.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.streamer.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Example to demonstrate how to compute a running average. In this example + * random numbers are being streamed into the system and the streamer + * continuously maintains a running average over last {@code 500} numbers. + * <p> + * Remote nodes should always be started with special configuration file: + * {@code 'ignite.{sh|bat} examples/config/example-streamer.xml'}. + * When starting nodes this way JAR file containing the examples code + * should be placed to {@code IGNITE_HOME/libs} folder. You can build + * {@code ignite-examples.jar} by running {@code mvn package} in + * {@code IGNITE_HOME/examples} folder. After that {@code ignite-examples.jar} + * will be generated by Maven in {@code IGNITE_HOME/examples/target} folder. + * <p> + * Alternatively you can run {@link org.apache.ignite.examples8.streaming.StreamingNodeStartup} in another JVM which will start node + * with {@code examples/config/example-streamer.xml} configuration. + */ +public class StreamingRunningAverageExample { + /** + * Main method. + * + * @param args Parameters. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + Ignite ignite = Ignition.start("examples/config/example-streamer.xml"); + + System.out.println(); + System.out.println(">>> Streaming running average example started."); + + final IgniteStreamer streamer = ignite.streamer("running-average"); + + final int rndRange = 100; + + // This thread executes a query across all nodes + // to collect a running average from all of them. + // During reduce step the results are collected + // and reduced into one average value. + Thread qryThread = new Thread(() -> { + while (!Thread.interrupted()) { + try { + try { + Thread.sleep(3000); + } + catch (InterruptedException ignore) { + return; + } + + // Running average. + double avg = streamer.context().reduce( + ctx -> ctx.<String, Average>localSpace().get("avg"), + new IgniteReducer<Average, Double>() { + private Average avg = new Average(); + + @Override public boolean collect(@Nullable Average a) { + if (a != null) + avg.add(a); + + return true; + } + + @Override public Double reduce() { + return avg.average(); + } + } + ); + + System.out.println("Got streamer query result [avg=" + avg + ", idealAvg=" + (rndRange / 2) + ']'); + } + catch (IgniteException e) { + System.out.println("Failed to execute streamer query: " + e); + } + } + }); + + // This thread continuously stream events + // into the system. + Thread evtThread = new Thread(() -> { + Random rnd = new Random(); + + while (!Thread.interrupted()) { + try { + streamer.addEvent(rnd.nextInt(rndRange)); + } + catch (IgniteException e) { + System.out.println("Failed to add streamer event: " + e); + } + } + }); + + try { + System.out.println(">>> Starting streamer query and producer threads. Press enter to stop this example."); + + qryThread.start(); + evtThread.start(); + + try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) { + in.readLine(); + } + } + finally { + qryThread.interrupt(); + evtThread.interrupt(); + + qryThread.join(); + evtThread.join(); + + Ignition.stopAll(false); + } + } + + /** + * Sample streamer stage to compute average. + */ + public static class StreamerStage implements org.apache.ignite.streamer.StreamerStage<Integer> { + /** {@inheritDoc} */ + @Override public String name() { + return "exampleStage"; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Integer> evts) { + ConcurrentMap<String, Average> loc = ctx.localSpace(); + + Average avg = loc.get("avg"); + + // Store average in local space if it was not done before. + if (avg == null) { + Average old = loc.putIfAbsent("avg", avg = new Average()); + + if (old != null) + avg = old; + } + + // For every input event, update the average. + for (Integer e : evts) + avg.add(e, 1); + + StreamerWindow<Integer> win = ctx.window(); + + // Add input events to window. + win.enqueueAll(evts); + + while (true) { + Integer e = win.pollEvicted(); + + if (e == null) + break; + + // Subtract evicted events from running average. + avg.add(-e, -1); + } + + return null; + } + } + + /** + * Class to help calculate average. + */ + public static class Average { + /** */ + private int total; + + /** */ + private int cnt; + + /** + * Adds one average to another. + * + * @param avg Average to add. + */ + public void add(Average avg) { + int total; + int cnt; + + synchronized (avg) { + total = avg.total; + cnt = avg.cnt; + } + + add(total, cnt); + } + + /** + * Adds passed in values to current values. + * <p> + * Note that this method is synchronized because multiple + * threads will be updating the same average instance concurrently. + * + * @param total Total delta. + * @param cnt Count delta. + */ + public synchronized void add(int total, int cnt) { + this.total += total; + this.cnt += cnt; + } + + /** + * Calculates current average based on total value and count. + * <p> + * Note that this method is synchronized because multiple + * threads will be updating the same average instance concurrently. + + * @return Running average. + */ + public synchronized double average() { + return (double)total / cnt; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/main/java8/org/apache/ignite/examples8/streaming/package.html ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples8/streaming/package.html b/examples/src/main/java8/org/apache/ignite/examples8/streaming/package.html new file mode 100644 index 0000000..112613f --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples8/streaming/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains streaming examples. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/BasicExamplesMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/BasicExamplesMultiNodeSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/BasicExamplesMultiNodeSelfTest.java new file mode 100644 index 0000000..8c610a7 --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/BasicExamplesMultiNodeSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +/** + * Closure examples multi-node self test. + */ +public class BasicExamplesMultiNodeSelfTest extends BasicExamplesSelfTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startRemoteNodes(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/BasicExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/BasicExamplesSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/BasicExamplesSelfTest.java new file mode 100644 index 0000000..0566930 --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/BasicExamplesSelfTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +import org.apache.ignite.examples8.computegrid.*; +import org.apache.ignite.testframework.junits.common.*; + +/** + * Closure examples self test. + */ +public class BasicExamplesSelfTest extends GridAbstractExamplesTest { + /** + * @throws Exception If failed. + */ + public void testBroadcastExample() throws Exception { + ComputeBroadcastExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCallableExample() throws Exception { + ComputeCallableExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testClosureExample() throws Exception { + ComputeClosureExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testExecutorExample() throws Exception { + ComputeExecutorServiceExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testReducerExample() throws Exception { + ComputeReducerExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testRunnableExample() throws Exception { + ComputeRunnableExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testTaskMapExample() throws Exception { + ComputeTaskMapExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testTaskSplitExample() throws Exception { + ComputeTaskSplitExample.main(EMPTY_ARGS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/CacheExamplesMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/CacheExamplesMultiNodeSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/CacheExamplesMultiNodeSelfTest.java new file mode 100644 index 0000000..b5015ca --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/CacheExamplesMultiNodeSelfTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +/** + * Cache examples multi-node self test. + */ +public class CacheExamplesMultiNodeSelfTest extends CacheExamplesSelfTest { + /** {@inheritDoc} */ + @Override protected String defaultConfig() { + return "examples/config/example-cache.xml"; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startRemoteNodes(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60 * 1000; + } + + /** {@inheritDoc} */ + @Override public void testCacheStoreExample() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void testCacheStoreLoaderExample() throws Exception { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/CacheExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/CacheExamplesSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/CacheExamplesSelfTest.java new file mode 100644 index 0000000..eff8917 --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/CacheExamplesSelfTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +import org.apache.ignite.examples8.datagrid.*; +import org.apache.ignite.examples8.datagrid.starschema.*; +import org.apache.ignite.examples8.datagrid.store.*; +import org.apache.ignite.examples8.datastructures.*; +import org.apache.ignite.testframework.junits.common.*; + +/** + * Cache examples self test. + */ +public class CacheExamplesSelfTest extends GridAbstractExamplesTest { + /** + * @throws Exception If failed. + */ + public void testCacheAffinityExample() throws Exception { + CacheAffinityExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheAtomicLongExample() throws Exception { + IgniteAtomicLongExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheAtomicReferenceExample() throws Exception { + IgniteAtomicReferenceExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheAtomicSequenceExample() throws Exception { + IgniteAtomicSequenceExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheAtomicStampedExample() throws Exception { + IgniteAtomicStampedExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheCountDownLatchExample() throws Exception { + IgniteCountDownLatchExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheQueueExample() throws Exception { + IgniteQueueExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheSetExample() throws Exception { + IgniteSetExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheStoreExample() throws Exception { + CacheStoreExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheQueryExample() throws Exception { + CacheQueryExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheApiExample() throws Exception { + CacheApiExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheTransactionExample() throws Exception { + CacheTransactionExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheDataStreamerExample() throws Exception { + CacheDataStreamerExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testPopularNumbersRealTimeExample() throws Exception { + CachePopularNumbersExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheStoreLoaderExample() throws Exception { + CacheStoreLoadDataExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCachePutGetExample() throws Exception { + CachePutGetExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testSnowflakeSchemaExample() throws Exception { + CacheStarSchemaExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ + public void testCacheContinuousQueryExample() throws Exception { + CacheContinuousQueryExample.main(EMPTY_ARGS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/CacheStoreLoadDataExampleMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/CacheStoreLoadDataExampleMultiNodeSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/CacheStoreLoadDataExampleMultiNodeSelfTest.java new file mode 100644 index 0000000..4b0ffed --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/CacheStoreLoadDataExampleMultiNodeSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +import org.apache.ignite.examples8.datagrid.store.*; +import org.apache.ignite.testframework.junits.common.*; + +/** + * + */ +public class CacheStoreLoadDataExampleMultiNodeSelfTest extends GridAbstractExamplesTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + for (int i = 0; i < RMT_NODES_CNT; i++) + startGrid("node-" + i, CacheNodeWithStoreStartup.configure()); + } + + /** + * @throws Exception If failed. + */ + public void testCacheStoreLoaderExample() throws Exception { + CacheStoreLoadDataExample.main(EMPTY_ARGS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/CheckpointExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/CheckpointExamplesSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/CheckpointExamplesSelfTest.java new file mode 100644 index 0000000..feba050 --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/CheckpointExamplesSelfTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +import org.apache.ignite.examples8.computegrid.failover.*; +import org.apache.ignite.testframework.junits.common.*; + +/** + * Checkpoint examples self test. + */ +public class CheckpointExamplesSelfTest extends GridAbstractExamplesTest { + /** + * Starts remote nodes before each test. + * + * Note: using beforeTestsStarted() to start nodes only once won't work. + * + * @throws Exception If remote nodes start failed. + */ + @Override protected void beforeTest() throws Exception { + for (int i = 0; i < RMT_NODES_CNT; i++) + startGrid("node-" + i, ComputeFailoverNodeStartup.configuration()); + } + + /** + * @throws Exception If failed. + */ + public void testCheckpointExample() throws Exception { + ComputeFailoverExample.main(EMPTY_ARGS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/ComputeClusterGroupsExampleSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/ComputeClusterGroupsExampleSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/ComputeClusterGroupsExampleSelfTest.java new file mode 100644 index 0000000..2a719de --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/ComputeClusterGroupsExampleSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +import org.apache.ignite.examples8.computegrid.*; +import org.apache.ignite.testframework.junits.common.*; + +/** + * + */ +public class ComputeClusterGroupsExampleSelfTest extends GridAbstractExamplesTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + // Start up a node. + startGrid("ignite-cluster-groups-example", DFLT_CFG); + } + + /** + * @throws Exception If failed. + */ + public void testComputeClusterGroupsExample() throws Exception { + ComputeClusterGroupsExample.main(EMPTY_ARGS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/ContinuationExamplesMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/ContinuationExamplesMultiNodeSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/ContinuationExamplesMultiNodeSelfTest.java new file mode 100644 index 0000000..955df50 --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/ContinuationExamplesMultiNodeSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +/** + * Continuation example multi-node self test. + */ +public class ContinuationExamplesMultiNodeSelfTest extends ContinuationExamplesSelfTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startRemoteNodes(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/ContinuationExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/ContinuationExamplesSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/ContinuationExamplesSelfTest.java new file mode 100644 index 0000000..6d2c061 --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/ContinuationExamplesSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +import org.apache.ignite.examples8.computegrid.*; +import org.apache.ignite.testframework.junits.common.*; + +/** + * Continuation example self test. + */ +public class ContinuationExamplesSelfTest extends GridAbstractExamplesTest { + /** + * @throws Exception If failed. + */ + public void testContinuationExample() throws Exception { + ComputeFibonacciContinuationExample.main(EMPTY_ARGS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/ContinuousMapperExamplesMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/ContinuousMapperExamplesMultiNodeSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/ContinuousMapperExamplesMultiNodeSelfTest.java new file mode 100644 index 0000000..6e0293f --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/ContinuousMapperExamplesMultiNodeSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +/** + * ContinuousMapperExample multi-node self test. + */ +public class ContinuousMapperExamplesMultiNodeSelfTest extends ContinuationExamplesSelfTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startRemoteNodes(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/ContinuousMapperExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/ContinuousMapperExamplesSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/ContinuousMapperExamplesSelfTest.java new file mode 100644 index 0000000..ce00520 --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/ContinuousMapperExamplesSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +import org.apache.ignite.examples8.computegrid.*; +import org.apache.ignite.testframework.junits.common.*; + +/** + * ContinuousMapperExample self test. + */ +public class ContinuousMapperExamplesSelfTest extends GridAbstractExamplesTest { + /** + * @throws Exception If failed. + */ + public void testContinuousMapperExample() throws Exception { + ComputeContinuousMapperExample.main(EMPTY_ARGS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/DeploymentExamplesMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/DeploymentExamplesMultiNodeSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/DeploymentExamplesMultiNodeSelfTest.java new file mode 100644 index 0000000..7d6dfcf --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/DeploymentExamplesMultiNodeSelfTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +/** + * Deployment examples multi-node self test. + */ +public class DeploymentExamplesMultiNodeSelfTest extends DeploymentExamplesSelfTest { + /** {@inheritDoc} */ + @Override public void testDeploymentExample() throws Exception { + startRemoteNodes(); + + super.testDeploymentExample(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/DeploymentExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/DeploymentExamplesSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/DeploymentExamplesSelfTest.java new file mode 100644 index 0000000..33371e8 --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/DeploymentExamplesSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +import org.apache.ignite.examples8.misc.deployment.*; +import org.apache.ignite.testframework.junits.common.*; + +/** + * Deployment examples self test. + */ +public class DeploymentExamplesSelfTest extends GridAbstractExamplesTest { + /** + * @throws Exception If failed. + */ + public void testDeploymentExample() throws Exception { + DeploymentExample.main(EMPTY_ARGS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28720c44/examples/src/test/java8/org/apache/ignite/examples8/EventsExamplesMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/examples8/EventsExamplesMultiNodeSelfTest.java b/examples/src/test/java8/org/apache/ignite/examples8/EventsExamplesMultiNodeSelfTest.java new file mode 100644 index 0000000..5f4d53e --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/examples8/EventsExamplesMultiNodeSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples8; + +/** + * Events examples multi-node self test. + */ +public class EventsExamplesMultiNodeSelfTest extends EventsExamplesSelfTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startRemoteNodes(); + } +}