sp-2 streaming cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1cc3d528 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1cc3d528 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1cc3d528 Branch: refs/heads/sprint-2-streaming-cleanup Commit: 1cc3d528f8b04e6d5760de297ccfd18afac5c087 Parents: 448e9c3 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Thu Mar 19 19:50:02 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Thu Mar 19 19:50:02 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/examples/ExamplesUtils.java | 18 - .../streaming/StreamingCheckInExample.java | 580 -------- .../streaming/StreamingNodeStartup.java | 35 - .../StreamingPopularNumbersExample.java | 250 ---- .../streaming/StreamingPriceBarsExample.java | 451 ------ .../StreamingRunningAverageExample.java | 247 ---- .../ignite/examples/streaming/package-info.java | 22 - .../src/main/java/org/apache/ignite/Ignite.java | 17 - .../java/org/apache/ignite/IgniteStreamer.java | 154 -- .../org/apache/ignite/cluster/ClusterGroup.java | 9 - .../configuration/IgniteConfiguration.java | 23 - .../ignite/internal/GridKernalContext.java | 8 - .../ignite/internal/GridKernalContextImpl.java | 12 - .../apache/ignite/internal/IgniteKernal.java | 26 - .../org/apache/ignite/internal/IgnitionEx.java | 12 - .../internal/cluster/ClusterGroupAdapter.java | 40 - .../cluster/IgniteClusterAsyncImpl.java | 5 - .../communication/GridIoMessageFactory.java | 16 - .../GridDeploymentPerLoaderStore.java | 1 - .../GridDeploymentPerVersionStore.java | 1 - .../streamer/GridStreamProcessor.java | 347 ----- .../streamer/GridStreamerAttributes.java | 127 -- .../streamer/GridStreamerCancelRequest.java | 110 -- .../streamer/GridStreamerContextDelegate.java | 102 -- .../streamer/GridStreamerContextImpl.java | 201 --- .../streamer/GridStreamerExecutionBatch.java | 175 --- .../streamer/GridStreamerExecutionRequest.java | 293 ---- .../streamer/GridStreamerResponse.java | 144 -- .../GridStreamerRouteFailedException.java | 36 - .../GridStreamerStageExecutionFuture.java | 340 ----- .../streamer/GridStreamerWindowIterator.java | 40 - .../processors/streamer/IgniteStreamerEx.java | 94 -- .../processors/streamer/IgniteStreamerImpl.java | 1375 ------------------ .../streamer/StreamerMBeanAdapter.java | 116 -- .../streamer/StreamerMetricsAdapter.java | 272 ---- .../streamer/StreamerMetricsHolder.java | 424 ------ .../streamer/StreamerStageMBeanAdapter.java | 101 -- .../streamer/StreamerStageMetricsAdapter.java | 135 -- .../streamer/StreamerStageMetricsHolder.java | 167 --- .../streamer/StreamerStageWrapper.java | 90 -- .../streamer/StreamerWindowMetricsAdapter.java | 65 - .../streamer/StreamerWindowMetricsHolder.java | 50 - .../processors/streamer/package-info.java | 22 - .../task/GridStreamerBroadcastTask.java | 137 -- .../streamer/task/GridStreamerQueryTask.java | 140 -- .../streamer/task/GridStreamerReduceTask.java | 144 -- .../ignite/internal/util/IgniteUtils.java | 23 - .../visor/node/VisorGridConfiguration.java | 12 - .../visor/node/VisorNodeDataCollectorJob.java | 35 - .../node/VisorNodeDataCollectorJobResult.java | 11 - .../visor/node/VisorNodeDataCollectorTask.java | 3 - .../node/VisorNodeDataCollectorTaskResult.java | 12 - .../internal/visor/streamer/VisorStreamer.java | 104 -- .../streamer/VisorStreamerConfiguration.java | 177 --- .../visor/streamer/VisorStreamerMetrics.java | 350 ----- .../streamer/VisorStreamerMetricsResetTask.java | 75 - .../visor/streamer/VisorStreamerResetTask.java | 75 - .../streamer/VisorStreamerStageMetrics.java | 277 ---- .../ignite/streamer/StreamerConfiguration.java | 249 ---- .../apache/ignite/streamer/StreamerContext.java | 142 -- .../ignite/streamer/StreamerEventRouter.java | 55 - .../streamer/StreamerEventRouterAdapter.java | 60 - .../streamer/StreamerFailureListener.java | 44 - .../apache/ignite/streamer/StreamerMBean.java | 169 --- .../apache/ignite/streamer/StreamerMetrics.java | 209 --- .../apache/ignite/streamer/StreamerStage.java | 61 - .../ignite/streamer/StreamerStageMBean.java | 114 -- .../ignite/streamer/StreamerStageMetrics.java | 93 -- .../apache/ignite/streamer/StreamerWindow.java | 206 --- .../ignite/streamer/StreamerWindowMBean.java | 58 - .../ignite/streamer/StreamerWindowMetrics.java | 44 - .../ignite/streamer/index/StreamerIndex.java | 305 ---- .../streamer/index/StreamerIndexEntry.java | 57 - .../streamer/index/StreamerIndexPolicy.java | 50 - .../streamer/index/StreamerIndexProvider.java | 107 -- .../index/StreamerIndexProviderAdapter.java | 796 ---------- .../index/StreamerIndexProviderMBean.java | 74 - .../streamer/index/StreamerIndexUpdateSync.java | 77 - .../streamer/index/StreamerIndexUpdater.java | 88 -- .../index/hash/StreamerHashIndexProvider.java | 500 ------- .../streamer/index/hash/package-info.java | 21 - .../ignite/streamer/index/package-info.java | 21 - .../index/tree/StreamerTreeIndexProvider.java | 953 ------------ .../streamer/index/tree/package-info.java | 21 - .../apache/ignite/streamer/package-info.java | 21 - .../router/StreamerAffinityEventRouter.java | 150 -- .../StreamerCacheAffinityEventRouter.java | 70 - .../router/StreamerLocalEventRouter.java | 46 - .../router/StreamerRandomEventRouter.java | 89 -- .../router/StreamerRoundRobinEventRouter.java | 53 - .../ignite/streamer/router/package-info.java | 21 - .../window/StreamerBoundedSizeBatchWindow.java | 804 ---------- .../window/StreamerBoundedSizeSortedWindow.java | 218 --- .../window/StreamerBoundedSizeWindow.java | 144 -- .../StreamerBoundedSizeWindowAdapter.java | 357 ----- .../window/StreamerBoundedTimeBatchWindow.java | 906 ------------ .../window/StreamerBoundedTimeWindow.java | 462 ------ .../window/StreamerUnboundedWindow.java | 111 -- .../streamer/window/StreamerWindowAdapter.java | 537 ------- .../ignite/streamer/window/package-info.java | 21 - .../streamer/GridStreamerEvictionSelfTest.java | 163 --- .../streamer/GridStreamerFailoverSelfTest.java | 267 ---- .../GridStreamerLifecycleAwareSelfTest.java | 212 --- .../streamer/GridStreamerSelfTest.java | 796 ---------- .../processors/streamer/GridTestStage.java | 52 - .../streamer/GridTestStreamerEventRouter.java | 49 - .../ignite/internal/processors/streamer/SC.java | 31 - .../ignite/loadtests/streamer/EventClosure.java | 60 - .../streamer/GridStreamerBenchmark.java | 140 -- .../streamer/GridStreamerIndexLoadTest.java | 179 --- .../loadtests/streamer/GridStreamerLoad.java | 62 - .../ignite/loadtests/streamer/IndexUpdater.java | 46 - .../ignite/loadtests/streamer/QueryClosure.java | 107 -- .../loadtests/streamer/average/TestAverage.java | 70 - .../loadtests/streamer/average/TestStage.java | 63 - .../marshaller/GridMarshallerAbstractTest.java | 56 - .../index/GridStreamerIndexSelfTest.java | 686 --------- .../window/GridStreamerWindowSelfTest.java | 911 ------------ .../ignite/testframework/junits/IgniteMock.java | 10 - .../ignite/testsuites/IgniteBasicTestSuite.java | 3 - .../testsuites/IgniteStreamerSelfTestSuite.java | 46 - .../org/apache/ignite/IgniteSpringBean.java | 14 - 122 files changed, 20935 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/examples/src/main/java/org/apache/ignite/examples/ExamplesUtils.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ExamplesUtils.java b/examples/src/main/java/org/apache/ignite/examples/ExamplesUtils.java index a1d59c9..a506595 100644 --- a/examples/src/main/java/org/apache/ignite/examples/ExamplesUtils.java +++ b/examples/src/main/java/org/apache/ignite/examples/ExamplesUtils.java @@ -17,9 +17,7 @@ package org.apache.ignite.examples; -import org.apache.ignite.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.streamer.*; import java.net.*; @@ -80,20 +78,4 @@ public class ExamplesUtils { return true; } - - /** - * @param ignite Ignite. - * @param name Streamer name. - * @return {@code True} if ignite has streamer with given name. - */ - public static boolean hasStreamer(Ignite ignite, String name) { - if (ignite.configuration().getStreamerConfiguration() != null) { - for (StreamerConfiguration cfg : ignite.configuration().getStreamerConfiguration()) { - if (name.equals(cfg.getName())) - return true; - } - } - - return false; - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingCheckInExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingCheckInExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingCheckInExample.java deleted file mode 100644 index b3f1948..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingCheckInExample.java +++ /dev/null @@ -1,580 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.streamer.*; -import org.apache.ignite.streamer.index.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * This streamer example is inspired by <a href="https://foursquare.com/">Foursquare</a> - * project. It shows the usage of window indexes, and, particularly, - * unique index, which allows to skip some duplicated events based on a certain - * criteria. - * <p> - * In this example we have a number of places with 2D locations, and a number - * of users who perform check-ins in random locations from time to time. - * A check-in is a streamer event, which is processed through the pipeline - * of 2 stages and added sequentially to two windows, both of which hold entries - * within past 10 seconds (the rest of the entries are evicted). - * <p> - * First stage simply ensures the user does not check-in twice within the - * 10 second time interval. This check is done by using a unique window hash index - * with user name as a key. In case of a duplicate, an error is handled and - * reported. - * <p> - * Second stage checks if a user has checked-in in one of the tracked places. - * If that's the case, then an info entry is added to a second window. Again, - * it is only valid for 10 seconds, and is evicted afterwards. - * <p> - * There is a separate timer task, which polls a second window index and displays - * the users that have checked-in in the known places within the last 10 seconds. - * <p> - * Remote nodes should always be started with special configuration file: - * {@code 'ignite.{sh|bat} examples/config/example-streamer.xml'}. - * When starting nodes this way JAR file containing the examples code - * should be placed to {@code IGNITE_HOME/libs} folder. You can build - * {@code ignite-examples.jar} by running {@code mvn package} in - * {@code IGNITE_HOME/examples} folder. After that {@code ignite-examples.jar} - * will be generated by Maven in {@code IGNITE_HOME/examples/target} folder. - * <p> - * Alternatively you can run {@link StreamingNodeStartup} in another JVM which will start node - * with {@code examples/config/example-streamer.xml} configuration. - */ -public class StreamingCheckInExample { - /** Streamer name. */ - private static final String STREAMER_NAME = "check-in"; - - /** - * Nearby distance. Locations with distance less than or equal - * to this one are considered to be nearby. - */ - private static final double NEARBY_DISTANCE = 5.0d; - - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** Total number of events to generate. */ - private static final int CNT = 60; - - /** User names. */ - private static final String[] USER_NAMES = { - "Alice", "Bob", "Ann", "Joe", "Mary", "Peter", "Lisa", "Tom", "Kate", "Sam" - }; - - /** Places, for which to track user check-ins. */ - private static final Place[] TRACKED_PLACES = { - new Place("Theatre", new Location(1.234, 2.567)), - new Place("Bowling", new Location(10.111, 5.213)), - new Place("Bar", new Location(15.199, 16.781)), - new Place("Cinema", new Location(3.77, 20.239)) - }; - - /** Max X coordinate. */ - private static final int MAX_X = 30; - - /** Max Y coordinate. */ - private static final int MAX_Y = 30; - - /** - * Executes example. - * - * @param args Command line arguments, none required. - * @throws IgniteException If example execution failed. - */ - public static void main(String[] args) throws IgniteException { - Timer timer = new Timer("check-in-query-worker"); - - // Start ignite. - final Ignite ignite = Ignition.start("examples/config/example-streamer.xml"); - - System.out.println(); - System.out.println(">>> Streaming check-in example started."); - - try { - // Get the streamer. - IgniteStreamer streamer = ignite.streamer(STREAMER_NAME); - - assert streamer != null; - - // Add a failure listener. - streamer.addStreamerFailureListener(new StreamerFailureListener() { - @Override public void onFailure(String stageName, Collection<Object> evts, Throwable err) { - System.err.println("Failure [stage=" + stageName + ", evts=" + evts + ", err=" + err.getMessage()); - } - }); - - // Periodically display users, who have checked-in in known places. - scheduleQuery(streamer, timer); - - // Stream the check-in events. - streamData(streamer); - - timer.cancel(); - - // Reset all streamers on all nodes to make sure that - // consecutive executions start from scratch. - ignite.compute().broadcast(new IgniteRunnable() { - @Override public void run() { - if (!ExamplesUtils.hasStreamer(ignite, STREAMER_NAME)) - System.err.println("Default streamer not found (is example-streamer.xml " + - "configuration used on all nodes?)"); - else { - IgniteStreamer streamer = ignite.streamer(STREAMER_NAME); - - System.out.println("Clearing streamer data."); - - streamer.reset(); - } - } - }); - } - finally { - Ignition.stop(true); - } - } - - /** - * Schedules the query to periodically output the users, who have - * checked-in in tracked places. - * - * @param streamer Streamer. - * @param timer Timer. - */ - private static void scheduleQuery(final IgniteStreamer streamer, Timer timer) { - TimerTask task = new TimerTask() { - @Override public void run() { - try { - // Send reduce query to all streamers running on local and remote noes. - Map<String, Place> userPlaces = streamer.context().reduce( - // This closure will execute on remote nodes. - new IgniteClosure<StreamerContext, Map<String, Place>>() { - @Override public Map<String, Place> apply( - StreamerContext ctx) { - StreamerWindow<LocationInfo> win = - ctx.window(DetectPlacesStage.class.getSimpleName()); - - assert win != null; - - StreamerIndex<LocationInfo, String, Place> idxView = win.index(); - - Collection<StreamerIndexEntry<LocationInfo, String, Place>> entries = - idxView.entries(0); - - Map<String, Place> ret = new HashMap<>(entries.size(), 1.0f); - - for (StreamerIndexEntry<LocationInfo, String, Place> e : entries) - ret.put(e.key(), e.value()); - - return ret; - } - }, - new IgniteReducer<Map<String, Place>, Map<String, Place>>() { - private Map<String, Place> map; - - @Override public boolean collect(@Nullable Map<String, Place> m) { - if (m == null) - return false; - - if (map != null) - map.putAll(m); - else - map = m; - - return true; - } - - @Override public Map<String, Place> reduce() { - return map; - } - } - ); - - StringBuilder sb = new StringBuilder("----------------\n"); - - for (Map.Entry<String, Place> userPlace : userPlaces.entrySet()) - sb.append(String.format("%s is at the %s (%s)\n", userPlace.getKey(), - userPlace.getValue().name(), userPlace.getValue().location())); - - sb.append("----------------\n"); - - System.out.print(sb.toString()); - } - catch (IgniteException e) { - e.printStackTrace(); - } - } - }; - - // Run task every 3 seconds. - timer.schedule(task, 3000, 3000); - } - - /** - * Streams check-in events into the system. - * - * @param streamer Streamer. - * @throws IgniteException If failed. - */ - @SuppressWarnings("BusyWait") - private static void streamData(IgniteStreamer streamer) throws IgniteException { - try { - for (int i = 0; i < CNT; i++) { - CheckInEvent evt = new CheckInEvent( - USER_NAMES[ThreadLocalRandom.current().nextInt(USER_NAMES.length)], - new Location( - RAND.nextDouble() + RAND.nextInt(MAX_X - 1), - RAND.nextDouble() + RAND.nextInt(MAX_Y)) - ); - - System.out.println(">>> Generating event: " + evt); - - streamer.addEvent(evt); - - Thread.sleep(1000); - } - } - catch (InterruptedException ignored) { - // No-op. - } - } - - /** - * Entity class that represents a 2D location. - */ - private static class Location { - /** Check-in location on X axis (longitude). */ - private final double x; - - /** Check-in location on Y axis (latitude). */ - private final double y; - - /** - * @param x X value. - * @param y Y value. - */ - Location(double x, double y) { - this.x = x; - this.y = y; - } - - /** - * @return Check-in location on X axis (longitude). - */ - public double x() { - return x; - } - - /** - * @return Check-in location on Y axis (latitude). - */ - public double y() { - return y; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "Location [x=" + x + ", y=" + y + ']'; - } - } - - /** - * Entity class representing a place, where - * users can check-in. - */ - private static class Place { - /** Place name. */ - private final String name; - - /** Location. */ - private final Location location; - - /** - * @param name Name. - * @param location Location. - */ - Place(String name, Location location) { - this.name = name; - this.location = location; - } - - /** - * @return Place name. - */ - public String name() { - return name; - } - - /** - * @return Location. - */ - public Location location() { - return location; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "Place [name=" + name + ", location=" + location + ']'; - } - } - - /** - * Check-in event. - */ - private static class CheckInEvent { - /** User name. */ - private final String userName; - - /** User location. */ - private final Location location; - - /** - * @param userName User name. - * @param location Location. - */ - CheckInEvent(String userName, Location location) { - this.userName = userName; - this.location = location; - } - - /** - * @return User name. - */ - public String userName() { - return userName; - } - - /** - * @return User location. - */ - public Location location() { - return location; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "CheckInEvent [userName=" + userName + ", location=" + location + ']'; - } - } - - /** - * Helper data structure for keeping information about - * check-in location and a corresponding place if found. - */ - private static class LocationInfo { - /** User name. */ - private final String userName; - - /** A detected check-in place. */ - private final Place place; - - /** - * @param userName User name. - * @param place Place. - */ - LocationInfo(String userName, Place place) { - this.userName = userName; - this.place = place; - } - - /** - * @return User name. - */ - public String userName() { - return userName; - } - - /** - * @return A detected check-in place. - */ - public Place place() { - return place; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "LocationInfo [userName=" + userName + ", place=" + place + ']'; - } - } - - /** - * Check-in event processing stage that adds events window - * with unique index to block repetitive check-ins. - */ - @SuppressWarnings("PublicInnerClass") - public static class AddToWindowStage implements StreamerStage<CheckInEvent> { - /** {@inheritDoc} */ - @Override public String name() { - return getClass().getSimpleName(); - } - - /** {@inheritDoc} */ - @Nullable @Override public Map<String, Collection<?>> run( - StreamerContext ctx, Collection<CheckInEvent> evts) { - StreamerWindow<CheckInEvent> win = ctx.window(name()); - - assert win != null; - - Collection<CheckInEvent> evts0 = new LinkedList<>(); - - // Add events to window. Our unique index should reject - // repetitive check-ins within a period of time, defined - // by the window. - for (CheckInEvent evt : evts) { - try { - win.enqueue(evt); - - evts0.add(evt); - } - catch (IgniteException e) { - if (e.getMessage().contains("Index unique key violation")) - System.err.println("Cannot check-in twice within the specified period of time [evt=" + evt + ']'); - else - throw e; - } - } - - // Clear evicted events. - win.pollEvictedAll(); - - // Move to the next stage in pipeline, if there are valid events. - if (!evts0.isEmpty()) - return Collections.<String, Collection<?>>singletonMap(ctx.nextStageName(), evts0); - - // Break the pipeline execution. - return null; - } - } - - /** - * Check-in event processing stage that detects the - * check-in places. - */ - private static class DetectPlacesStage implements StreamerStage<CheckInEvent> { - /** {@inheritDoc} */ - @Override public String name() { - return getClass().getSimpleName(); - } - - /** {@inheritDoc} */ - @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, - Collection<CheckInEvent> evts) { - StreamerWindow<LocationInfo> win = ctx.window(name()); - - assert win != null; - - for (CheckInEvent evt : evts) { - for (Place place : TRACKED_PLACES) { - if (distance(evt.location(), place.location()) <= NEARBY_DISTANCE) { - win.enqueue(new LocationInfo(evt.userName(), place)); - - break; - } - } - } - - // Clear evicted location infos. - win.pollEvictedAll(); - - // Null means there are no more stages and - // we should finish the pipeline. - return null; - } - - /** - * Calculates the distance between 2 locations. - * - * @param loc1 First location. - * @param loc2 Second location. - * @return Distance between locations. - */ - private double distance(Location loc1, Location loc2) { - double xDiff = Math.abs(loc1.x() - loc2.x()); - double yDiff = Math.abs(loc1.y() - loc2.y()); - - // Return a vector distance between the points. - return Math.sqrt(xDiff * xDiff + yDiff * yDiff); - } - } - - /** - * Index updater for check-in events. Updaters are specified for {@link StreamerIndexProviderAdapter} in - * streamer configuration. - */ - private static class CheckInEventIndexUpdater implements StreamerIndexUpdater<CheckInEvent, String, Location> { - /** {@inheritDoc} */ - @Nullable @Override public String indexKey(CheckInEvent evt) { - return evt.userName(); // Index key is an event user name. - } - - /** {@inheritDoc} */ - @Nullable @Override public Location initialValue(CheckInEvent evt, String key) { - return evt.location(); // Index value is an event location. - } - - /** {@inheritDoc} */ - @Nullable @Override public Location onAdded( - StreamerIndexEntry<CheckInEvent, String, Location> entry, - CheckInEvent evt) { - throw new AssertionError("onAdded() shouldn't be called on unique index."); - } - - /** {@inheritDoc} */ - @Nullable @Override public Location onRemoved( - StreamerIndexEntry<CheckInEvent, String, Location> entry, - CheckInEvent evt) { - return null; - } - } - - /** - * Index updater for location info. Updaters are specified for {@link StreamerIndexProviderAdapter} in - * streamer configuration. - */ - private static class PlacesIndexUpdater implements StreamerIndexUpdater<LocationInfo, String, Place> { - /** {@inheritDoc} */ - @Nullable @Override public String indexKey(LocationInfo info) { - return info.userName(); - } - - /** {@inheritDoc} */ - @Nullable @Override public Place initialValue(LocationInfo info, String key) { - return info.place(); - } - - /** {@inheritDoc} */ - @Nullable @Override public Place onAdded( - StreamerIndexEntry<LocationInfo, String, Place> entry, - LocationInfo evt) { - throw new AssertionError("onAdded() shouldn't be called on unique index."); - } - - /** {@inheritDoc} */ - @Nullable @Override public Place onRemoved( - StreamerIndexEntry<LocationInfo, String, Place> entry, - LocationInfo evt) { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingNodeStartup.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingNodeStartup.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingNodeStartup.java deleted file mode 100644 index c4baae2..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingNodeStartup.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming; - -import org.apache.ignite.*; - -/** - * Starts up node with example streamer configuration. - */ -public class StreamingNodeStartup { - /** - * Start up an empty node with specified cache configuration. - * - * @param args Command line arguments, none required. - * @throws IgniteException If example execution failed. - */ - public static void main(String[] args) throws IgniteException { - Ignition.start("examples/config/example-streamer.xml"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java deleted file mode 100644 index 95a7272..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.streamer.*; -import org.apache.ignite.streamer.index.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Real time streaming popular numbers counter. This example receives a constant stream of - * random numbers. The gaussian distribution is chosen to make sure that numbers closer - * to 0 have higher probability. This example will find {@link #POPULAR_NUMBERS_CNT} number - * of popular numbers over last N number of numbers, where N is specified as streamer - * window size in {@code examples/config/example-streamer.xml} configuration file and - * is set to {@code 10,000}. - * <p> - * Remote nodes should always be started with special configuration file: - * {@code 'ignite.{sh|bat} examples/config/example-streamer.xml'}. - * When starting nodes this way JAR file containing the examples code - * should be placed to {@code IGNITE_HOME/libs} folder. You can build - * {@code ignite-examples.jar} by running {@code mvn package} in - * {@code IGNITE_HOME/examples} folder. After that {@code ignite-examples.jar} - * will be generated by Maven in {@code IGNITE_HOME/examples/target} folder. - * <p> - * Alternatively you can run {@link StreamingNodeStartup} in another JVM which will start node - * with {@code examples/config/example-streamer.xml} configuration. - */ -public class StreamingPopularNumbersExample { - /** Count of most popular numbers to retrieve from ignite. */ - private static final int POPULAR_NUMBERS_CNT = 10; - - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** Count of total numbers to generate. */ - private static final int CNT = 10_000_000; - - /** Comparator sorting random number entries by number popularity. */ - private static final Comparator<StreamerIndexEntry<Integer, Integer, Long>> CMP = - new Comparator<StreamerIndexEntry<Integer, Integer, Long>>() { - @Override public int compare(StreamerIndexEntry<Integer, Integer, Long> e1, - StreamerIndexEntry<Integer, Integer, Long> e2) { - return e2.value().compareTo(e1.value()); - } - }; - - /** Reducer selecting first POPULAR_NUMBERS_CNT values. */ - private static class PopularNumbersReducer implements IgniteReducer<Collection<StreamerIndexEntry<Integer, Integer, Long>>, - Collection<StreamerIndexEntry<Integer, Integer, Long>>> { - /** */ - private final List<StreamerIndexEntry<Integer, Integer, Long>> sorted = new ArrayList<>(); - - /** {@inheritDoc} */ - @Override public boolean collect(@Nullable Collection<StreamerIndexEntry<Integer, Integer, Long>> col) { - if (col != null && !col.isEmpty()) - // Add result from remote node to sorted set. - sorted.addAll(col); - - return true; - } - - /** {@inheritDoc} */ - @Override public Collection<StreamerIndexEntry<Integer, Integer, Long>> reduce() { - Collections.sort(sorted, CMP); - - return sorted.subList(0, POPULAR_NUMBERS_CNT < sorted.size() ? POPULAR_NUMBERS_CNT : sorted.size()); - } - } - - /** - * Executes example. - * - * @param args Command line arguments, none required. - * @throws IgniteException If example execution failed. - */ - public static void main(String[] args) throws IgniteException { - Timer popularNumbersQryTimer = new Timer("numbers-query-worker"); - - // Start ignite. - final Ignite ignite = Ignition.start("examples/config/example-streamer.xml"); - - System.out.println(); - System.out.println(">>> Streaming popular numbers example started."); - - try { - // Schedule query to find most popular words to run every 3 seconds. - TimerTask task = scheduleQuery(ignite, popularNumbersQryTimer); - - streamData(ignite); - - // Force one more run to get final counts. - task.run(); - - popularNumbersQryTimer.cancel(); - - // Reset all streamers on all nodes to make sure that - // consecutive executions start from scratch. - ignite.compute().broadcast(new IgniteRunnable() { - @Override public void run() { - if (!ExamplesUtils.hasStreamer(ignite, "popular-numbers")) - System.err.println("Default streamer not found (is example-streamer.xml " + - "configuration used on all nodes?)"); - else { - IgniteStreamer streamer = ignite.streamer("popular-numbers"); - - System.out.println("Clearing number counters from streamer."); - - streamer.reset(); - } - } - }); - } - finally { - Ignition.stop(true); - } - } - - /** - * Streams random numbers into the system. - * - * @param ignite Ignite. - * @throws IgniteException If failed. - */ - private static void streamData(final Ignite ignite) throws IgniteException { - final IgniteStreamer streamer = ignite.streamer("popular-numbers"); - - // Use gaussian distribution to ensure that - // numbers closer to 0 have higher probability. - for (int i = 0; i < CNT; i++) - streamer.addEvent(((Double)(RAND.nextGaussian() * 10)).intValue()); - } - - /** - * Schedules our popular numbers query to run every 3 seconds. - * - * @param ignite Ignite. - * @param timer Timer. - * @return Scheduled task. - */ - private static TimerTask scheduleQuery(final Ignite ignite, Timer timer) { - TimerTask task = new TimerTask() { - @Override public void run() { - final IgniteStreamer streamer = ignite.streamer("popular-numbers"); - - try { - // Send reduce query to all 'popular-numbers' streamers - // running on local and remote nodes. - Collection<StreamerIndexEntry<Integer, Integer, Long>> col = streamer.context().reduce( - // This closure will execute on remote nodes. - new IgniteClosure<StreamerContext, - Collection<StreamerIndexEntry<Integer, Integer, Long>>>() { - @Override public Collection<StreamerIndexEntry<Integer, Integer, Long>> apply( - StreamerContext ctx) { - StreamerIndex<Integer, Integer, Long> view = ctx.<Integer>window().index(); - - return view.entries(-1 * POPULAR_NUMBERS_CNT); - } - }, - // The reducer will always execute locally, on the same node - // that submitted the query. - new PopularNumbersReducer()); - - for (StreamerIndexEntry<Integer, Integer, Long> cntr : col) - System.out.printf("%3d=%d\n", cntr.key(), cntr.value()); - - System.out.println("----------------"); - } - catch (IgniteException e) { - e.printStackTrace(); - } - } - }; - - timer.schedule(task, 3000, 3000); - - return task; - } - - /** - * Sample streamer stage to compute average. - */ - @SuppressWarnings("PublicInnerClass") - public static class StreamerStage implements org.apache.ignite.streamer.StreamerStage<Integer> { - /** {@inheritDoc} */ - @Override public String name() { - return "exampleStage"; - } - - /** {@inheritDoc} */ - @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Integer> nums) { - StreamerWindow<Integer> win = ctx.window(); - - // Add numbers to window. - win.enqueueAll(nums); - - // Clear evicted numbers. - win.clearEvicted(); - - // Null means that there are no more stages - // and that stage pipeline is completed. - return null; - } - } - - /** - * This class will be set as part of window index configuration. - */ - private static class IndexUpdater implements StreamerIndexUpdater<Integer, Integer, Long> { - /** {@inheritDoc} */ - @Override public Integer indexKey(Integer evt) { - // We use event as index key, so event and key are the same. - return evt; - } - - /** {@inheritDoc} */ - @Nullable @Override public Long onAdded(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) { - return entry.value() + 1; - } - - /** {@inheritDoc} */ - @Nullable @Override public Long onRemoved(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) { - return entry.value() - 1 == 0 ? null : entry.value() - 1; - } - - /** {@inheritDoc} */ - @Override public Long initialValue(Integer evt, Integer key) { - return 1L; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPriceBarsExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPriceBarsExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPriceBarsExample.java deleted file mode 100644 index 6098ffd..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPriceBarsExample.java +++ /dev/null @@ -1,451 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.streamer.*; -import org.apache.ignite.streamer.router.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Example to demonstrate streamer with multiple stages. This example builds price - * bars which are entities that aggregate prices over some time interval. For each - * interval a price bar holds the following metrics: - * <ul> - * <li>Open price - the first price in the time interval.</li> - * <li>High price - the highest price in the interval.</li> - * <li>Low price - the lowest price in the interval.</li> - * <li>Close prices - the last price in the interval.</li> - * </ul> - * In this example trade quotes for several instruments are streamed into the system. - * Constructing of price bars is performed in two stages. The first stage builds price bars - * for one second intervals, and the second stage used results of the first stage to - * build price bars for every 2 second interval. - * <p> - * Note, the bars in the example are not real charting bars, but rather a simplification - * with purpose to demonstrate multi-stage streaming processing. - * <p> - * Remote nodes should always be started with special configuration file: - * {@code 'ignite.{sh|bat} examples/config/example-streamer.xml'}. - * When starting nodes this way JAR file containing the examples code - * should be placed to {@code IGNITE_HOME/libs} folder. You can build - * {@code ignite-examples.jar} by running {@code mvn package} in - * {@code IGNITE_HOME/examples} folder. After that {@code ignite-examples.jar} - * will be generated by Maven in {@code IGNITE_HOME/examples/target} folder. - * <p> - * Alternatively you can run {@link StreamingNodeStartup} in another JVM which will start node - * with {@code examples/config/example-streamer.xml} configuration. - */ -public class StreamingPriceBarsExample { - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** Count of total numbers to generate. */ - private static final int CNT = 10000000; - - /** The list of instruments. */ - private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE"}; - - /** The list of initial instrument prices. */ - private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24}; - - /** - * Executes example. - * - * @param args Command line arguments, none required. - * @throws IgniteException If example execution failed. - */ - public static void main(String[] args) throws IgniteException { - Timer timer = new Timer("priceBars"); - - // Start ignite. - final Ignite ignite = Ignition.start("examples/config/example-streamer.xml"); - - System.out.println(); - System.out.println(">>> Streaming price bars example started."); - - try { - TimerTask task = scheduleQuery(ignite, timer); - - streamData(ignite); - - // Force one more run to get final results. - task.run(); - - timer.cancel(); - - // Reset all streamers on all nodes to make sure that - // consecutive executions start from scratch. - ignite.compute().broadcast(new IgniteRunnable() { - @Override public void run() { - if (!ExamplesUtils.hasStreamer(ignite, "priceBars")) - System.err.println("Default streamer not found (is example-streamer.xml " + - "configuration used on all nodes?)"); - else { - IgniteStreamer streamer = ignite.streamer("priceBars"); - - System.out.println("Clearing bars from streamer."); - - streamer.reset(); - } - } - }); - } - finally { - Ignition.stop(true); - } - } - - /** - * Schedules the query to periodically output built bars to the console. - * - * @param ignite Ignite. - * @param timer Timer. - * @return Scheduled task. - */ - private static TimerTask scheduleQuery(final Ignite ignite, Timer timer) { - TimerTask task = new TimerTask() { - @Override public void run() { - final IgniteStreamer streamer = ignite.streamer("priceBars"); - - try { - Collection<Bar> bars = streamer.context().reduce( - // This closure will execute on remote nodes. - new IgniteClosure<StreamerContext, Collection<Bar>>() { - @Override public Collection<Bar> apply(StreamerContext ctx) { - Collection<Bar> values = ctx.<String, Bar>localSpace().values(); - - Collection<Bar> res = new ArrayList<>(values.size()); - - for (Bar bar : values) - res.add(bar.copy()); - - return res; - } - }, - // The reducer will always execute locally, on the same node - // that submitted the query. - new IgniteReducer<Collection<Bar>, Collection<Bar>>() { - private final Collection<Bar> res = new ArrayList<>(); - - @Override public boolean collect(@Nullable Collection<Bar> col) { - res.addAll(col); - - return true; - } - - @Override public Collection<Bar> reduce() { - return res; - } - } - ); - - for (Bar bar : bars) - System.out.println(bar.toString()); - - System.out.println("-----------------"); - } - catch (IgniteException e) { - e.printStackTrace(); - } - } - }; - - timer.schedule(task, 2000, 2000); - - return task; - } - - /** - * Streams random prices into the system. - * - * @param ignite Ignite. - * @throws IgniteException If failed. - */ - private static void streamData(final Ignite ignite) throws IgniteException { - IgniteStreamer streamer = ignite.streamer("priceBars"); - - for (int i = 0; i < CNT; i++) { - for (int j = 0; j < INSTRUMENTS.length; j++) { - // Use gaussian distribution to ensure that - // numbers closer to 0 have higher probability. - double price = round2(INITIAL_PRICES[j] + RAND.nextGaussian()); - - Quote quote = new Quote(INSTRUMENTS[j], price); - - streamer.addEvent(quote); - } - } - } - - /** - * Rounds double value to two significant signs. - * - * @param val value to be rounded. - * @return rounded double value. - */ - private static double round2(double val) { - return Math.floor(100 * val + 0.5) / 100; - } - - /** - * Trade quote that is streamed into the system. - */ - private static class Quote implements StreamerAffinityEventRouter.AffinityEvent { - /** Instrument symbol. */ - private final String symbol; - - /** Price. */ - private final double price; - - /** - * @param symbol Symbol. - * @param price Price. - */ - Quote(String symbol, double price) { - this.symbol = symbol; - this.price = price; - } - - /** - * @return Symbol. - */ - public String symbol() { - return symbol; - } - - /** - * @return Price. - */ - public double price() { - return price; - } - - /** {@inheritDoc} */ - @Override public String affinityKey() { - return symbol; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "Quote [symbol=" + symbol + ", price=" + price + ']'; - } - } - - /** - * The bar that is build by the streamer. - */ - private static class Bar implements StreamerAffinityEventRouter.AffinityEvent { - /** Instrument symbol. */ - private final String symbol; - - /** Open price. */ - private volatile double open; - - /** High price. */ - private volatile double high; - - /** Low price. */ - private volatile double low = Long.MAX_VALUE; - - /** Close price. */ - private volatile double close; - - /** - * @param symbol Symbol. - */ - Bar(String symbol) { - this.symbol = symbol; - } - - /** - * @return Copy of this instance. - */ - public synchronized Bar copy() { - Bar res = new Bar(symbol); - - res.open = open; - res.high = high; - res.low = low; - res.close = close; - - return res; - } - - /** - * Updates this bar with last price. - * - * @param price Price. - */ - public synchronized void update(double price) { - if (open == 0) - open = price; - - high = Math.max(high, price); - low = Math.min(low, price); - close = price; - } - - /** - * Updates this bar with next bar. - * - * @param bar Next bar. - */ - public synchronized void update(Bar bar) { - if (open == 0) - open = bar.open; - - high = Math.max(high, bar.high); - low = Math.min(low, bar.low); - close = bar.close; - } - - /** - * @return Symbol. - */ - public String symbol() { - return symbol; - } - - /** - * @return Open price. - */ - public double open() { - return open; - } - - /** - * @return High price. - */ - public double high() { - return high; - } - - /** - * @return Low price. - */ - public double low() { - return low; - } - - /** - * @return Close price. - */ - public double close() { - return close; - } - - /** {@inheritDoc} */ - @Override public String affinityKey() { - return symbol; - } - - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return "Bar [symbol=" + symbol + ", open=" + open + ", high=" + high + ", low=" + low + - ", close=" + close + ']'; - } - } - - /** - * The first stage where 1 second bars are built. - */ - @SuppressWarnings({ "PublicInnerClass", "unchecked" }) - public static class FirstStage implements StreamerStage<Quote> { - /** {@inheritDoc} */ - @Override public String name() { - return getClass().getSimpleName(); - } - - /** {@inheritDoc} */ - @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Quote> quotes) { - StreamerWindow win = ctx.window("stage1"); - - // Add numbers to window. - win.enqueueAll(quotes); - - Collection<Quote> polled = win.pollEvictedBatch(); - - if (!polled.isEmpty()) { - Map<String, Bar> map = new HashMap<>(); - - for (Quote quote : polled) { - String symbol = quote.symbol(); - - Bar bar = map.get(symbol); - - if (bar == null) - map.put(symbol, bar = new Bar(symbol)); - - bar.update(quote.price()); - } - - return Collections.<String, Collection<?>>singletonMap(ctx.nextStageName(), map.values()); - } - - return null; - } - } - - /** - * The second stage where 2 second bars are built. - */ - @SuppressWarnings({ "PublicInnerClass", "unchecked" }) - public static class SecondStage implements StreamerStage<Bar> { - /** {@inheritDoc} */ - @Override public String name() { - return getClass().getSimpleName(); - } - - /** {@inheritDoc} */ - @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Bar> bars) { - ConcurrentMap<String, Bar> loc = ctx.localSpace(); - - StreamerWindow win = ctx.window("stage2"); - - // Add numbers to window. - win.enqueueAll(bars); - - Collection<Bar> polled = win.pollEvictedBatch(); - - if (!polled.isEmpty()) { - Map<String, Bar> map = new HashMap<>(); - - for (Bar polledBar : polled) { - String symbol = polledBar.symbol(); - - Bar bar = map.get(symbol); - - if (bar == null) - map.put(symbol, bar = new Bar(symbol)); - - bar.update(polledBar); - } - - loc.putAll(map); - } - - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingRunningAverageExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingRunningAverageExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingRunningAverageExample.java deleted file mode 100644 index 8df5dc6..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingRunningAverageExample.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.streamer.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Example to demonstrate how to compute a running average. In this example - * random numbers are being streamed into the system and the streamer - * continuously maintains a running average over last {@code 500} numbers. - * <p> - * Remote nodes should always be started with special configuration file: - * {@code 'ignite.{sh|bat} examples/config/example-streamer.xml'}. - * When starting nodes this way JAR file containing the examples code - * should be placed to {@code IGNITE_HOME/libs} folder. You can build - * {@code ignite-examples.jar} by running {@code mvn package} in - * {@code IGNITE_HOME/examples} folder. After that {@code ignite-examples.jar} - * will be generated by Maven in {@code IGNITE_HOME/examples/target} folder. - * <p> - * Alternatively you can run {@link StreamingNodeStartup} in another JVM which will start node - * with {@code examples/config/example-streamer.xml} configuration. - */ -public class StreamingRunningAverageExample { - /** - * Main method. - * - * @param args Parameters. - * @throws Exception If failed. - */ - public static void main(String[] args) throws Exception { - Ignite ignite = Ignition.start("examples/config/example-streamer.xml"); - - System.out.println(); - System.out.println(">>> Streaming running average example started."); - - final IgniteStreamer streamer = ignite.streamer("running-average"); - - final int rndRange = 100; - - // This thread executes a query across all nodes - // to collect a running average from all of them. - // During reduce step the results are collected - // and reduced into one average value. - Thread qryThread = new Thread(new Runnable() { - @SuppressWarnings("BusyWait") - @Override public void run() { - while (!Thread.interrupted()) { - try { - try { - Thread.sleep(3000); - } - catch (InterruptedException ignore) { - return; - } - - // Running average. - double avg = streamer.context().reduce( - new IgniteClosure<StreamerContext, Average>() { - @Override public Average apply(StreamerContext ctx) { - return ctx.<String, Average>localSpace().get("avg"); - } - }, - new IgniteReducer<Average, Double>() { - private Average avg = new Average(); - - @Override public boolean collect(@Nullable Average a) { - if (a != null) - avg.add(a); - - return true; - } - - @Override public Double reduce() { - return avg.average(); - } - } - ); - - System.out.println("Got streamer query result [avg=" + avg + ", idealAvg=" + (rndRange / 2) + ']'); - } - catch (IgniteException e) { - System.out.println("Failed to execute streamer query: " + e); - } - } - } - }); - - // This thread continuously stream events - // into the system. - Thread evtThread = new Thread(new Runnable() { - @Override public void run() { - Random rnd = new Random(); - - while (!Thread.interrupted()) { - try { - streamer.addEvent(rnd.nextInt(rndRange)); - } - catch (IgniteException e) { - System.out.println("Failed to add streamer event: " + e); - } - } - } - }); - - try { - System.out.println(">>> Starting streamer query and producer threads. Press enter to stop this example."); - - qryThread.start(); - evtThread.start(); - - try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) { - in.readLine(); - } - } - finally { - qryThread.interrupt(); - evtThread.interrupt(); - - qryThread.join(); - evtThread.join(); - - Ignition.stopAll(false); - } - } - - /** - * Sample streamer stage to compute average. - */ - public static class StreamerStage implements org.apache.ignite.streamer.StreamerStage<Integer> { - /** {@inheritDoc} */ - @Override public String name() { - return "exampleStage"; - } - - /** {@inheritDoc} */ - @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Integer> evts) { - ConcurrentMap<String, Average> loc = ctx.localSpace(); - - Average avg = loc.get("avg"); - - // Store average in local space if it was not done before. - if (avg == null) { - Average old = loc.putIfAbsent("avg", avg = new Average()); - - if (old != null) - avg = old; - } - - // For every input event, update the average. - for (Integer e : evts) - avg.add(e, 1); - - StreamerWindow<Integer> win = ctx.window(); - - // Add input events to window. - win.enqueueAll(evts); - - while (true) { - Integer e = win.pollEvicted(); - - if (e == null) - break; - - // Subtract evicted events from running average. - avg.add(-e, -1); - } - - return null; - } - } - - /** - * Class to help calculate average. - */ - public static class Average { - /** */ - private int total; - - /** */ - private int cnt; - - /** - * Adds one average to another. - * - * @param avg Average to add. - */ - public void add(Average avg) { - int total; - int cnt; - - synchronized (avg) { - total = avg.total; - cnt = avg.cnt; - } - - add(total, cnt); - } - - /** - * Adds passed in values to current values. - * <p> - * Note that this method is synchronized because multiple - * threads will be updating the same average instance concurrently. - * - * @param total Total delta. - * @param cnt Count delta. - */ - public synchronized void add(int total, int cnt) { - this.total += total; - this.cnt += cnt; - } - - /** - * Calculates current average based on total value and count. - * <p> - * Note that this method is synchronized because multiple - * threads will be updating the same average instance concurrently. - - * @return Running average. - */ - public synchronized double average() { - return (double)total / cnt; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java deleted file mode 100644 index e1ded7e..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <!-- Package description. --> - * Contains streaming examples. - */ -package org.apache.ignite.examples.streaming; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index 2ae1bc8..3e35f4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -44,7 +44,6 @@ import java.util.concurrent.*; * <li>{@link IgniteCache} - functionality for in-memory distributed cache.</li> * <li>{@link IgniteDataStreamer} - functionality for loading data large amounts of data into cache.</li> * <li>{@link IgniteFileSystem} - functionality for distributed Hadoop-compliant in-memory file system and map-reduce.</li> - * <li>{@link IgniteStreamer} - functionality for streaming events workflow with queries and indexes into rolling windows.</li> * <li>{@link IgniteScheduler} - functionality for scheduling jobs using UNIX Cron syntax.</li> * <li>{@link IgniteCompute} - functionality for executing tasks and closures on all grid nodes (inherited form {@link ClusterGroup}).</li> * <li>{@link IgniteMessaging} - functionality for topic-based message exchange on all grid nodes (inherited form {@link ClusterGroup}).</li> @@ -234,22 +233,6 @@ public interface Ignite extends AutoCloseable { public Collection<IgniteFileSystem> fileSystems(); /** - * Gets an instance of streamer by name, if one does not exist then - * {@link IllegalArgumentException} will be thrown. - * - * @param name Streamer name. - * @return Streamer for given name. - */ - public IgniteStreamer streamer(@Nullable String name); - - /** - * Gets all instances of streamers. - * - * @return Collection of all streamer instances. - */ - public Collection<IgniteStreamer> streamers(); - - /** * Will get an atomic sequence from cache and create one if it has not been created yet and {@code create} flag * is {@code true}. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java deleted file mode 100644 index f7e20cd..0000000 --- a/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite; - -import org.apache.ignite.streamer.*; - -import java.util.*; - -/** - * Streamer interface. Streamer provides an easy way to process large (possibly infinite) stream of - * events. Event can be of any object type, different types of events can be submitted to streamer. Each event - * is processed by one or more {@link org.apache.ignite.streamer.StreamerStage}, a set of stages event passed through is called pipeline. - * <p> - * For each submitted group of events streamer determines one or more execution nodes that will process this - * group of events. Execution nodes are determined by {@link org.apache.ignite.streamer.StreamerEventRouter}. Execution nodes run stages - * with received events. After stage execution streamer gets an optional set of events that should be processed - * further. The process is repeated until stage returns empty map. After stage returned empty map pipeline execution - * for given group of events is finished. - * <p> - * It is guaranteed that group of events returned by router will be neither split nor concatenated with - * any other group of events and will be passed to stage as is. Event processing order is not guaranteed, group that - * was submitted second can be processed earlier then first submitted group. - * <p> - * If {@link StreamerConfiguration#isAtLeastOnce()} is set to {@code false}, then event execution is not tracked - * by streamer and any occurred failure will be reported to failure listener on node on which failure happened. If - * this configuration property is set to {@code true}, then streamer will cancel current pipeline execution in case - * of failure and will try to execute pipeline from the beginning. If failover cannot be succeeded or maximum number - * of failover attempts is exceeded, then listener will be notified on node which originated pipeline execution. - * - * @see org.apache.ignite.streamer.StreamerStage - * @see org.apache.ignite.streamer.StreamerEventRouter - */ -public interface IgniteStreamer { - /** - * Gets streamer configuration. - * - * @return Streamer configuration. - */ - public StreamerConfiguration configuration(); - - /** - * Gets streamer name. - * - * @return Streamer name, or {@code null} for default no-name streamer. - */ - public String name(); - - /** - * Submits group of events for processing. This group of events will be processed on default stage, - * i.e. stage that is the first in the streamer stages list. - * - * @param evt Event to add. - * @param evts Optional events to add. - * @throws IgniteException If event submission failed. - */ - public void addEvent(Object evt, Object... evts) throws IgniteException; - - /** - * Submits group of events to streamer. Events will be processed from a stage with specified name. - * - * @param stageName Stage name to start with. - * @param evt Event tp process. - * @param evts Optional events. - * @throws IgniteException If event submission failed. - */ - public void addEventToStage(String stageName, Object evt, Object... evts) throws IgniteException; - - /** - * Submits group of events for processing. This group of events will be processed on default stage, - * i.e. stage that is the first in the streamer stages list. - * - * @param evts Events to add. - * @throws IgniteException If event submission failed. - */ - public void addEvents(Collection<?> evts) throws IgniteException; - - /** - * Submits events to streamer. Events will be processed from a stage with specified name. - * - * @param stageName Stage name to start with. - * @param evts Events to process. - * @throws IgniteException If event submission failed. - */ - public void addEventsToStage(String stageName, Collection<?> evts) throws IgniteException; - - /** - * Gets streamer context. Streamer context provides access to streamer local space on this node, configured - * streamer windows and provides various methods to run streamer queries. - * - * @return Streamer context. - */ - public StreamerContext context(); - - /** - * Adds streamer failure listener. Listener will be notified on node on which failure occurred in case if - * {@link StreamerConfiguration#isAtLeastOnce()} is set to {@code false} and on node which originated - * pipeline execution otherwise. - * - * @param lsnr Listener to add. - */ - public void addStreamerFailureListener(StreamerFailureListener lsnr); - - /** - * Removes streamer failure listener. - * - * @param lsnr Listener to remove. - */ - public void removeStreamerFailureListener(StreamerFailureListener lsnr); - - /** - * Gets current streamer metrics. - * - * @return Streamer metrics. - */ - public StreamerMetrics metrics(); - - /** - * Resets all configured streamer windows by calling {@link org.apache.ignite.streamer.StreamerWindow#reset()} on each and - * clears local space. - * <p> - * This is local method, it will clear only local windows and local space. Note that windows and - * space will not be cleaned while stages are executing, i.e. all currently running stages will - * have to complete before streamer can be reset. - */ - public void reset(); - - /** - * Resets all streamer metrics. - */ - public void resetMetrics(); - - /** - * Explicitly sets deployment class. Will be used only if peer-to-peer class loading - * is enabled. - * - * @param depCls Deployment class. - */ - public void deployClass(Class<?> depCls); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java index 983a962..402273e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java +++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java @@ -168,15 +168,6 @@ public interface ClusterGroup { public ClusterGroup forClientNodes(String cacheName); /** - * Creates projection for all nodes that have streamer with specified name running. - * - * @param streamerName Streamer name. - * @param streamerNames Optional additional streamer names to include into projection. - * @return Projection over nodes that have specified streamer running. - */ - public ClusterGroup forStreamer(String streamerName, @Nullable String... streamerNames); - - /** * Gets grid projection consisting from the nodes in this projection excluding the local node. * * @return Grid projection consisting from the nodes in this projection excluding the local node, if any. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 7c6b205..21a7d48 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -37,7 +37,6 @@ import org.apache.ignite.spi.failover.*; import org.apache.ignite.spi.indexing.*; import org.apache.ignite.spi.loadbalancing.*; import org.apache.ignite.spi.swapspace.*; -import org.apache.ignite.streamer.*; import javax.cache.event.*; import javax.cache.expiry.*; @@ -346,9 +345,6 @@ public class IgniteConfiguration { /** IGFS configuration. */ private FileSystemConfiguration[] igfsCfg; - /** Streamer configuration. */ - private StreamerConfiguration[] streamerCfg; - /** Service configuration. */ private ServiceConfiguration[] svcCfgs; @@ -444,7 +440,6 @@ public class IgniteConfiguration { segResolvers = cfg.getSegmentationResolvers(); sndRetryCnt = cfg.getNetworkSendRetryCount(); sndRetryDelay = cfg.getNetworkSendRetryDelay(); - streamerCfg = cfg.getStreamerConfiguration(); svcCfgs = cfg.getServiceConfiguration(); sysPoolSize = cfg.getSystemThreadPoolSize(); timeSrvPortBase = cfg.getTimeServerPortBase(); @@ -1741,24 +1736,6 @@ public class IgniteConfiguration { } /** - * Gets streamers configurations. - * - * @return Streamers configurations. - */ - public StreamerConfiguration[] getStreamerConfiguration() { - return streamerCfg; - } - - /** - * Sets streamer configuration. - * - * @param streamerCfg Streamer configuration. - */ - public void setStreamerConfiguration(StreamerConfiguration... streamerCfg) { - this.streamerCfg = streamerCfg; - } - - /** * Gets hadoop configuration. * * @return Hadoop configuration. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 3289ee3..ad7d562 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.processors.segmentation.*; import org.apache.ignite.internal.processors.service.*; import org.apache.ignite.internal.processors.session.*; -import org.apache.ignite.internal.processors.streamer.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; @@ -264,13 +263,6 @@ public interface GridKernalContext extends Iterable<GridComponent> { public IgfsHelper igfsHelper(); /** - * Gets stream processor. - * - * @return Stream processor. - */ - public GridStreamProcessor stream(); - - /** * Gets event continuous processor. * * @return Event continuous processor. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 58970c9..2759c2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -56,7 +56,6 @@ import org.apache.ignite.internal.processors.segmentation.*; import org.apache.ignite.internal.processors.service.*; import org.apache.ignite.internal.processors.session.*; import org.apache.ignite.internal.processors.spring.*; -import org.apache.ignite.internal.processors.streamer.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; @@ -219,10 +218,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable private GridAffinityProcessor affProc; /** */ - @GridToStringInclude - private GridStreamProcessor streamProc; - - /** */ @GridToStringExclude private GridContinuousProcessor contProc; @@ -458,8 +453,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable igfsProc = (IgfsProcessorAdapter)comp; else if (comp instanceof GridOffHeapProcessor) offheapProc = (GridOffHeapProcessor)comp; - else if (comp instanceof GridStreamProcessor) - streamProc = (GridStreamProcessor)comp; else if (comp instanceof GridContinuousProcessor) contProc = (GridContinuousProcessor)comp; else if (comp instanceof HadoopProcessorAdapter) @@ -590,11 +583,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ - @Override public GridStreamProcessor stream() { - return streamProc; - } - - /** {@inheritDoc} */ @Override public GridDeploymentManager deploy() { return depMgr; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index ab1038c..6b04bc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -56,7 +56,6 @@ import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.processors.segmentation.*; import org.apache.ignite.internal.processors.service.*; import org.apache.ignite.internal.processors.session.*; -import org.apache.ignite.internal.processors.streamer.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; @@ -754,7 +753,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { startProcessor((GridProcessor)SCHEDULE.createOptional(ctx)); startProcessor(new GridRestProcessor(ctx)); startProcessor(new DataStreamProcessor(ctx)); - startProcessor(new GridStreamProcessor(ctx)); startProcessor((GridProcessor) IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration()))); startProcessor(new GridContinuousProcessor(ctx)); startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ? @@ -2383,30 +2381,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** {@inheritDoc} */ - @Override public IgniteStreamer streamer(@Nullable String name) { - guard(); - - try { - return ctx.stream().streamer(name); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgniteStreamer> streamers() { - guard(); - - try { - return ctx.stream().streamers(); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ @Override public IgniteProductVersion version() { return VER; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index d181b39..822e64f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -49,7 +49,6 @@ import org.apache.ignite.spi.loadbalancing.*; import org.apache.ignite.spi.loadbalancing.roundrobin.*; import org.apache.ignite.spi.swapspace.file.*; import org.apache.ignite.spi.swapspace.noop.*; -import org.apache.ignite.streamer.*; import org.apache.ignite.thread.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -1616,17 +1615,6 @@ public class IgnitionEx { myCfg.setFileSystemConfiguration(clone); } - StreamerConfiguration[] streamerCfgs = myCfg.getStreamerConfiguration(); - - if (streamerCfgs != null) { - StreamerConfiguration[] clone = streamerCfgs.clone(); - - for (int i = 0; i < streamerCfgs.length; i++) - clone[i] = new StreamerConfiguration(streamerCfgs[i]); - - myCfg.setStreamerConfiguration(clone); - } - initializeDefaultSpi(myCfg); initializeDefaultCacheConfiguration(myCfg);