Repository: incubator-edgent Updated Branches: refs/heads/master 9e4934ea4 -> ee9ac2845
[Edgent-409] add analytics.math3.Aggregations - analytics.math3 - add Aggregations - add UnivariateAggregate (general math3.json.JsonUniv...) - add UnivariateAggregator (general math3.json.JsonUniv...) - add package-info - analytics.math3.stats - add tuple-type-independent stats - add Statistics2 (general Statistics) - add Regression2 (general Regression) - add StorelessStatistic (general JsonStorelessStatistic) - add package scoped Count - add package scoped OLS (general JsonOLS) - add Statistics2Test Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/99e9f151 Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/99e9f151 Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/99e9f151 Branch: refs/heads/master Commit: 99e9f1519ddf900c996449d74c3867db099dbbce Parents: 9e4934e Author: Dale LaBossiere <[email protected]> Authored: Thu Apr 20 11:35:41 2017 -0400 Committer: Dale LaBossiere <[email protected]> Committed: Thu Apr 20 11:39:57 2017 -0400 ---------------------------------------------------------------------- .../edgent/analytics/math3/Aggregations.java | 313 +++++++++++++ .../analytics/math3/UnivariateAggregate.java | 48 ++ .../analytics/math3/UnivariateAggregator.java | 47 ++ .../analytics/math3/json/JsonAnalytics.java | 2 + .../edgent/analytics/math3/package-info.java | 24 + .../edgent/analytics/math3/stat/Count.java | 40 ++ .../apache/edgent/analytics/math3/stat/OLS.java | 80 ++++ .../analytics/math3/stat/Regression2.java | 56 +++ .../edgent/analytics/math3/stat/Statistic2.java | 94 ++++ .../math3/stat/StorelessStatistic.java | 58 +++ .../test/analytics/math3/Statistics2Test.java | 460 +++++++++++++++++++ 11 files changed, 1222 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/Aggregations.java ---------------------------------------------------------------------- diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/Aggregations.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/Aggregations.java new file mode 100644 index 0000000..4f037ad --- /dev/null +++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/Aggregations.java @@ -0,0 +1,313 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.analytics.math3; + +import java.util.Collection; +import java.util.HashMap; + +import org.apache.edgent.analytics.math3.json.JsonAnalytics; +import org.apache.edgent.analytics.math3.stat.Regression2; +import org.apache.edgent.analytics.math3.stat.Statistic2; +import org.apache.edgent.function.Function; +import org.apache.edgent.function.ToDoubleFunction; +import org.apache.edgent.topology.TWindow; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; + +/** + * Apache Common Math analytics for Collections. + * + * <p>These operations are typically used when processing a collection of tuples + * in a non-partitioned or partitioned {@link TWindow}. + * + * <p>Simple sum aggregation functions are also provided (helpful given the avoidance + * of depending on Java8 Streams). + * + * <p>Example: compute a batched MEAN aggregation for a simple unpartitioned window of + * numeric {@code TStream<Double>} where the desired result is a {@code TStream<Double>} of + * the MEAN values: + * <pre>{@code + * TStream<Double> pressureReadings = ... + *TWindow<Double,Integer> window = pressureReadings.last(5, Functions.unpartitioned()); + *TStream<Double> meanPressureReadings = window.batch( + * (list, partition) -> Aggregations.aggregate(list, Statistic2.MEAN)); + * }</pre> + * + * <p>Example: compute the MEAN and SLOPE, capturing the results in a {@link ResultMap}: + * <pre>{@code + * TStream<Double> pressureReadings = ... + *TWindow<Double,Integer> window = pressureReadings.last(5, Functions.unpartitioned()); + *TStream<ResultMap> meanPressureReadings = window.batch( + * (list, partition) -> Aggregations.aggregateN(list, Statistic2.MEAN, Regression2.SLOPE)); + * }</pre> + * + * <p>Example: compute multiple aggregations on multiple variables in a tuple T, + * capturing the results in a {@link MvResultMap}: + * <pre>{@code + *class SensorReading { + * double getTemp() {...}; + * double getPressure() {...}; + * ... }; + *TStream<SensorReading> readings = ... + *TWindow<SensorReading,Integer> window = sensorReadings.last(5, Functions.unpartitioned()); + *TStream<MvResultMap> aggregations = window.batch( + * (list, partition) -> { + * ResultMap pressureResults = Aggregations.aggregateN(list, t -> t.getPressure(), Statistic2.MEAN, Regression2.SLOPE)); + * ResultMap tempResults = Aggregations.aggregateN(list, t -> t.getTemp(), Statistic2.MAX)); + * MvResultMap results = Aggregations.newMvResults(); + * results.put("pressure", pressureResults); + * results.put("temp", tempResults); + * return results; + * }); + * }</pre> + * + * <p>Example: convert a {@code TStream<ResultMap>} or {@code TStream<MvResultMap>} to a JsonObject: + * + * <pre>{@code + * TStream<ResultMap> resultMap = ... + * TStream<JsonObject> joResultMap = resultMap.map(Aggregations.newResultsToJsonFn()); + * }</pre> + * + * + * <p>Background: + * {@link JsonAnalytics} predates this class. Use of JsonAnalytics for computing + * Apache Commons Math aggregations requires: + * <ul> + * <li>the input stream tuple type must be a JsonObject and the user must identify + * the property that contains the value to be aggregated.</li> + * <li>batched window aggregations are not supported (they could be added).</li> + * <li>the aggregation result tuple is a JsonObject that holds a user-specified + * property for a window partition key and a user-specified property + * for a JsonObject holding the UnivariateAggregation results by name (e.g., "MEAN"). + * Accessing a particular aggregation results in the JsonObject + * is somewhat cumbersome.</li> + * </ul> + * + * <p>As a result, using JsonAnalytics for simple cases can be a bit unintuitive and cumbersome. + * + * <p>For example, to JsonAnalytics for a simple case of a continuous aggregation + * of {@code TStream<Double>} => {@code TStream<Double>} of MEAN values: + * + * <pre>{@code + * TStream<Double> pressureReadings = ... + * + * // transform TStream<Double> to TStream<JsonObject> + * TStream<JsonObject> joPressureReadings = pressureReadings.map(JsonFunctions.valueOfDouble("pressure")); + * + * // aggregate + * TWindow<JsonObject,JsonElement> window = joPressureReadings.last(5, JsonFunctions.unpartitioned()); + * TStream<JsonObject> results = JsonAnalytics.aggregate(window, "partition", "pressure", Statistic.MEAN); + * + * // transform to TStream<Double> mean results + * TStream<Double> meanPressureReadings = results.map(jo -> jo.get("pressure").getAsObject().get("MEAN").getAsDouble()); + * }</pre> + * + * <p>Using Aggregations it's simply: + * + * <pre>{@code + *TWindow<Double,Integer> window = pressureReadings.last(5, Functions.unpartitioned()); + *TStream<Double> meanPressureReadings = window.aggregate( + * (list, partition) -> Aggregations.aggregate(list, Statistic2.MEAN)); + * }</pre> + * + * + * @see Statistic2 + * @see Regression2 + * @see JsonAnalytics + */ +public class Aggregations { + + /** + * Perform a sum of numbers treated as double values, accumulating in a double. + * An empty list yields a 0.0 result. + * @param list numbers to sum + * @return the sum + */ + public static Double sum(Collection<? extends Number> list) { + double sum = 0.0; + for (Number v : list) + sum += v.doubleValue(); + return sum; + } + + /** + * Perform a sum of numbers treated as long values, accumulating in a long. + * More efficient than sum() for non-floating point values. + * An empty list yields a 0 result. + * @param list numbers to sum + * @return the sum + */ + public static long sumInts(Collection<? extends Number> list) { + long sum = 0; + for (Number v : list) + sum += v.longValue(); + return sum; + } + + /** + * Aggregation results for a single aggregated variable. + */ + public static class ResultMap extends HashMap<UnivariateAggregate,Double> { + private static final long serialVersionUID = 1L; + } + + /** + * Create a new empty {@link ResultMap}. + * @return the ResultMap. + */ + public static ResultMap newResults() { return new ResultMap(); } + + /** + * Create a {@link Function} whose {@code apply(ResultMap)} converts the value + * to a {@code JsonObject}. The ResultMap's key's names are the JsonObject property + * names and the property value is the key's value. + * + * <p>An example resulting JsonObject would be <pre>{ "MEAN":3.75, "MIN":2.0 }</pre>. + * @return the JsonObject + */ + public static Function<ResultMap,JsonObject> newResultsToJson() { + Gson gson = new Gson(); + return (ResultMap resultMap) -> gson.toJsonTree(resultMap).getAsJsonObject(); + } + + /** + * Aggregation results for a multiple aggregated variables. + * <p>The name of the aggregated variable is typically the key for the variable's {@link ResultMap}. + */ + public static class MvResultMap extends HashMap<String,ResultMap> { + private static final long serialVersionUID = 1L; + }; + + /** + * Create a new empty {@link MvResultMap}. + * @return the MvResultMap. + */ + public static MvResultMap newMvResults() { return new MvResultMap(); } + + /** + * Create a {@link Function} whose {@code apply(MvResultMap)} converts the value + * to a {@code JsonObject}. The MvResultMap's key's names are the JsonObject property + * names and the property value is the JsonObject for the key's ResultMap value. + * + * <p>An example resulting JsonObject would be + * <pre>{ "temperature":{"MEAN":123.75, "MAX":180.5}, "pressure":{"MAX":13.0} }</pre>. + * + * @return the JsonObject + */ + public static Function<MvResultMap,JsonObject> newMvResultsToJson() { + Gson gson = new Gson(); + return (MvResultMap mvResultMap) -> gson.toJsonTree(mvResultMap).getAsJsonObject(); + } + + /** + * Perform the specified {@link UnivariateAggregate} on a Collection of {@link Number}. + * + * <p>A null result is returned if the collection is empty. + * An aggregation result may be null under other conditions, + * e.g., a Regression2.SLOPE where the minimum number of samples has not been met. + * + * @param c the Collection to aggregate + * @param aggregate the aggregation to perform + * @return the aggregation result, may be null. + */ + public static Double aggregate(Collection<? extends Number> c, UnivariateAggregate aggregate) { + return aggregateN(c, aggregate).get(aggregate); + } + + /** + * Perform the specified {@link UnivariateAggregate} on a Collection of {@link Number}. + * + * <p>If the collection is empty an empty ResultMap is returned. + * The ResultMap does not contain an entry for an aggregation with a null, + * e.g., a Regression2.SLOPE where the minimum number of samples has not been met. + * + * @param c the Collection to aggregate + * @param aggregates the aggregations to perform + * @return a {@link ResultMap} containing the variable's aggregation results + */ + public static ResultMap aggregateN(Collection<? extends Number> c, UnivariateAggregate... aggregates) { + return aggregateN(c, num -> num.doubleValue(), aggregates); + } + + /** + * Perform the specified {@link UnivariateAggregate} a Collection of {@code T} + * using the specified {@link ToDoubleFunction getter} to extract the + * variable to aggregate. + * + * <p>A null result is returned if the collection is empty. + * An aggregation result may be null under other conditions, + * e.g., a Regression2.SLOPE where the minimum number of samples has not been met. + * + * @param c the Collection to aggregate + * @param getter function that returns the variable to aggregate from a {@code T} + * @param aggregate the aggregation to perform + * @return the aggregation result, may be null. + */ + public static <T> Double aggregate(Collection<T> c, ToDoubleFunction<T> getter, UnivariateAggregate aggregate) { + return aggregateN(c, getter, aggregate).get(aggregate); + } + + /** + * Perform the specified {@link UnivariateAggregate} a Collection of {@code T} + * using the specified {@link ToDoubleFunction getter} to extract the + * variable to aggregate. + * + * <p>If the collection is empty an empty ResultMap is returned. + * The ResultMap does not contain an entry for an aggregation with a null, + * e.g., a Regression2.SLOPE where the minimum number of samples has not been met. + * + * @param c the Collection to aggregate + * @param getter function that returns the variable to aggregate from a {@code T} + * @param aggregates the aggregations to perform + * @return a {@link ResultMap} containing the variable's aggregation results + */ + public static <T> ResultMap aggregateN(Collection<T> c, ToDoubleFunction<T> getter, UnivariateAggregate... aggregates) { + + final int n = c.size(); + final ResultMap result = newResults(); + + if (n != 0) { + // get new UnivariateAggregate instances for this aggregation + final UnivariateAggregator[] aggregators = new UnivariateAggregator[aggregates.length]; + for (int i = 0; i < aggregates.length; i++) { + aggregators[i] = aggregates[i].get(); + } + for (UnivariateAggregator agg : aggregators) { + agg.clear(n); + } + for (T value : c) { + Double d = getter.applyAsDouble(value); + for (UnivariateAggregator agg : aggregators) { + agg.increment(d); + } + } + for (UnivariateAggregator agg : aggregators) { + // do as JsonAnalytics did and omit Nan/Inf results from the map. + double rv = agg.getResult(); + + if (Double.isFinite(rv)) + result.put(agg.getAggregate(), Double.valueOf(rv)); + } + } + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregate.java ---------------------------------------------------------------------- diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregate.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregate.java new file mode 100644 index 0000000..903ee7f --- /dev/null +++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregate.java @@ -0,0 +1,48 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.analytics.math3; + +import org.apache.edgent.function.Supplier; + +/** + * Univariate aggregate for a tuple. + * This is the declaration of the aggregate that + * application use when declaring a topology. + * <P> + * Implementations are typically enums such + * as {@link org.apache.edgent.analytics.math3.stat.Statistic2 Statistic2}. + * </P> + * <P> + * Each call to {@code get()} must return a new + * {@link UnivariateAggregator aggregator} + * that implements the required aggregate. + * </P> + * + * @see Aggregations + */ +public interface UnivariateAggregate extends Supplier<UnivariateAggregator>{ + + /** + * Name of the aggregate. + * The returned value is used as the JSON key containing + * the result of the aggregation. + * @return Name of the aggregate. + */ + public String name(); +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregator.java ---------------------------------------------------------------------- diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregator.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregator.java new file mode 100644 index 0000000..52a1730 --- /dev/null +++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregator.java @@ -0,0 +1,47 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.analytics.math3; + +/** + * Univariate aggregator for tuples. + * This is the runtime implementation interface + * of {@link UnivariateAggregate} defined aggregate. + */ +public interface UnivariateAggregator { + + UnivariateAggregate getAggregate(); + + /** + * Clear the aggregator to prepare for a new aggregation. + * @param n Number of tuples to be aggregated. + */ + void clear(int n); + + /** + * Add a value to the aggregation. + * @param value Value to be added. + */ + void increment(double value); + + /** + * Get the aggregation result. It may be a NaN (empty collection, etc) or Infinite value. + * @return the result + */ + double getResult(); +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java ---------------------------------------------------------------------- diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java index c6e9108..cfe1a78 100644 --- a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java +++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java @@ -21,6 +21,7 @@ package org.apache.edgent.analytics.math3.json; import java.util.List; import org.apache.commons.math3.util.Pair; +import org.apache.edgent.analytics.math3.Aggregations; import org.apache.edgent.function.BiFunction; import org.apache.edgent.function.ToDoubleFunction; import org.apache.edgent.topology.TStream; @@ -32,6 +33,7 @@ import com.google.gson.JsonObject; /** * Apache Common Math analytics for streams with JSON tuples. * + * @see Aggregations */ public class JsonAnalytics { http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/package-info.java ---------------------------------------------------------------------- diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/package-info.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/package-info.java new file mode 100644 index 0000000..e31b29b --- /dev/null +++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/package-info.java @@ -0,0 +1,24 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +/** + * Analytics using Apache Commons Math. + */ +package org.apache.edgent.analytics.math3; + http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Count.java ---------------------------------------------------------------------- diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Count.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Count.java new file mode 100644 index 0000000..102f507 --- /dev/null +++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Count.java @@ -0,0 +1,40 @@ +package org.apache.edgent.analytics.math3.stat; + +import org.apache.commons.math3.stat.descriptive.AbstractStorelessUnivariateStatistic; +import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic; + +/** + * The number of items in the collection being aggregated. + * + * Need this semi-hack to be able to capture the number + * of items in an aggregation in a ResultMap. + */ +class Count extends AbstractStorelessUnivariateStatistic { + int n; + + @Override + public long getN() { + return n; + } + + @Override + public void clear() { + n = 0; + } + + @Override + public StorelessUnivariateStatistic copy() { + return new Count(); + } + + @Override + public double getResult() { + return n; + } + + @Override + public void increment(double arg0) { + n++; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/OLS.java ---------------------------------------------------------------------- diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/OLS.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/OLS.java new file mode 100644 index 0000000..c798101 --- /dev/null +++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/OLS.java @@ -0,0 +1,80 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.analytics.math3.stat; + +import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression; +import org.apache.edgent.analytics.math3.UnivariateAggregate; +import org.apache.edgent.analytics.math3.UnivariateAggregator; + +/** + * Wrapper over commons math3 {@code OLSMultipleLinearRegression} + */ +class OLS implements UnivariateAggregator { + + private final Regression2 type; + private final OLSMultipleLinearRegression ols = new OLSMultipleLinearRegression(); + private double[] values; + private int yOffset; + + OLS(Regression2 type) { + this.type = type; + } + + @Override + public UnivariateAggregate getAggregate() { + return type; + } + + @Override + public void clear(int n) { + values = new double[n*2]; + yOffset = 0; + } + + @Override + public void increment(double v) { + values[yOffset] = v; + yOffset+=2; + } + + void setSampleData() { + // Fill in the x values + for (int x = 0; x < values.length/2; x++) + values[(x*2)+1] = x; + ols.newSampleData(values, values.length/2, 1); + } + + @Override + public double getResult() { + // If there are no values or only a single + // value then we cannot calculate the slope. + if (values.length <= 2) + return Double.NaN; + + setSampleData(); + double[] regressionParams = ols.estimateRegressionParameters(); + if (regressionParams.length >= 2) { + // [0] is the constant (zero'th order) + // [1] is the first order , which we use as the slope. + final double slope = regressionParams[1]; + return slope; + } + return Double.NaN; + } +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression2.java ---------------------------------------------------------------------- diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression2.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression2.java new file mode 100644 index 0000000..4524e95 --- /dev/null +++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression2.java @@ -0,0 +1,56 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.analytics.math3.stat; + +import org.apache.edgent.analytics.math3.UnivariateAggregate; +import org.apache.edgent.analytics.math3.UnivariateAggregator; + +/** + * Univariate regression aggregates. + * + * Unfortunately the implicitly-Json Regression wasn't named JsonRegresson. + */ +public enum Regression2 implements UnivariateAggregate { + + /** + * Calculate the slope of a single variable. + * The slope is calculated using the first + * order of a ordinary least squares + * linear regression. + * The list of values for the single + * single variable are processed as Y-values + * that are evenly spaced on the X-axis. + * <BR> + * This is useful as a simple determination + * if the variable is increasing or decreasing. + * <BR> + * The slope value is represented as a {@code double} + * with the key {@code SLOPE} in the aggregate result. + * <BR> + * If the collection to be aggregated contains less than + * two values then no regression is performed + * (an associated UnivariateAggregator.getResult() returns NaN). + */ + SLOPE() { + @Override + public UnivariateAggregator get() { + return new OLS(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic2.java ---------------------------------------------------------------------- diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic2.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic2.java new file mode 100644 index 0000000..d731dac --- /dev/null +++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic2.java @@ -0,0 +1,94 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.analytics.math3.stat; + +import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic; +import org.apache.commons.math3.stat.descriptive.moment.Mean; +import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation; +import org.apache.commons.math3.stat.descriptive.rank.Max; +import org.apache.commons.math3.stat.descriptive.rank.Min; +import org.apache.commons.math3.stat.descriptive.summary.Sum; +import org.apache.edgent.analytics.math3.Aggregations; +import org.apache.edgent.analytics.math3.UnivariateAggregate; +import org.apache.edgent.analytics.math3.UnivariateAggregator; + +/** + * Statistic implementations. + * + * Univariate statistic aggregate calculations against a value + * extracted from a tuple. + * + * Unfortunately the implicitly-Json Statistic wasn't named JsonStatistic. + * + * @see Aggregations + */ +public enum Statistic2 implements UnivariateAggregate { + + /** + * Calculate the number of elements in the collection. + * The value is represented as a {@code double} + * with the key {@code COUNT} in the aggregate result. + */ + COUNT(new Count()), + /** + * Calculate the arithmetic mean. + * The mean value is represented as a {@code double} + * with the key {@code MEAN} in the aggregate result. + */ + MEAN(new Mean()), + /** + * Calculate the minimum. + * The minimum value is represented as a {@code double} + * with the key {@code MIN} in the aggregate result. + */ + MIN(new Min()), + /** + * Calculate the maximum. + * The maximum value is represented as a {@code double} + * with the key {@code MAXIMUM} in the aggregate result. + */ + MAX(new Max()), + /** + * Calculate the sum. + * The sum is represented as a {@code double} + * with the key {@code SUM} in the aggregate result. + */ + SUM(new Sum()), + /** + * Calculate the standard deviation. + */ + STDDEV(new StandardDeviation()); + + private final StorelessUnivariateStatistic statImpl; + + private Statistic2(StorelessUnivariateStatistic statImpl) { + this.statImpl = statImpl; + statImpl.clear(); + } + + /** + * Return a new instance of this statistic implementation. + * N.B. must call clear(n) before using the result. + * @return A new instance of this statistic implementation. + */ + @Override + public UnivariateAggregator get() { + return new StorelessStatistic(this, statImpl.copy()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/StorelessStatistic.java ---------------------------------------------------------------------- diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/StorelessStatistic.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/StorelessStatistic.java new file mode 100644 index 0000000..a0e9bc6 --- /dev/null +++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/StorelessStatistic.java @@ -0,0 +1,58 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.analytics.math3.stat; + +import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic; +import org.apache.edgent.analytics.math3.UnivariateAggregate; +import org.apache.edgent.analytics.math3.UnivariateAggregator; + +/** + * Univariate aggregator implementation wrapping a {@code StorelessUnivariateStatistic}. + */ +public class StorelessStatistic implements UnivariateAggregator { + + private final Statistic2 stat; + private final StorelessUnivariateStatistic statImpl; + + public StorelessStatistic(Statistic2 stat, StorelessUnivariateStatistic statImpl) { + this.stat = stat; + this.statImpl = statImpl; + } + + @Override + public UnivariateAggregate getAggregate() { + return stat; + } + + @Override + public void clear(int n) { + statImpl.clear(); + } + + @Override + public void increment(double v) { + statImpl.increment(v); + } + + @Override + public double getResult() { + return statImpl.getResult(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/Statistics2Test.java ---------------------------------------------------------------------- diff --git a/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/Statistics2Test.java b/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/Statistics2Test.java new file mode 100644 index 0000000..6e463a2 --- /dev/null +++ b/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/Statistics2Test.java @@ -0,0 +1,460 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.test.analytics.math3; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.edgent.analytics.math3.Aggregations; +import org.apache.edgent.analytics.math3.Aggregations.MvResultMap; +import org.apache.edgent.analytics.math3.Aggregations.ResultMap; +import org.apache.edgent.analytics.math3.UnivariateAggregate; +import org.apache.edgent.analytics.math3.stat.Regression2; +import org.apache.edgent.analytics.math3.stat.Statistic2; +import org.apache.edgent.function.Functions; +import org.apache.edgent.test.providers.direct.DirectTestSetup; +import org.apache.edgent.test.topology.TopologyAbstractTest; +import org.apache.edgent.topology.TStream; +import org.apache.edgent.topology.TWindow; +import org.apache.edgent.topology.Topology; +import org.apache.edgent.topology.tester.Condition; +import org.junit.Test; + +import com.google.gson.JsonObject; + +/** test Aggregations, Statistics2 and Regression2 */ +public class Statistics2Test extends TopologyAbstractTest implements DirectTestSetup { + + // Expected results for **continuous last-2** aggregations of (1, 4, 102, 0) for the various stats. + protected static Map<UnivariateAggregate,Double[]> STAT_RESULTS = new HashMap<>(); + static { + STAT_RESULTS.put( + Statistic2.COUNT, + new Double[] { + 1.0, // 1 + 2.0, // 1,4 + 2.0, // 4,102 + 2.0 // 102,0 + }); + STAT_RESULTS.put( + Statistic2.MIN, + new Double[] { + 1.0, // 1 + 1.0, // 1,4 + 4.0, // 4,102 + 0.0 // 102,0 + }); + STAT_RESULTS.put( + Statistic2.MAX, + new Double[] { + 1.0, // 1 + 4.0, // 1,4 + 102.0, // 4,102 + 102.0 // 102,0 + }); + STAT_RESULTS.put( + Statistic2.SUM, + new Double[] { + 1.0, // 1 + 5.0, // 1,4 + 106.0, // 4,102 + 102.0 // 102,0 + }); + STAT_RESULTS.put( + Statistic2.MEAN, + new Double[] { + 1.0, // 1 + 2.5, // 1,4 + 53.0, // 4,102 + 51.0 // 102,0 + }); + STAT_RESULTS.put( + Statistic2.STDDEV, + new Double[] { + 0.0, // 1 + 2.12, // 1,4 + 69.29, // 4,102 + 72.12 // 102,0 + }); + STAT_RESULTS.put( + Regression2.SLOPE, + new Double[] { + null, // 1 + 3.0, // 1,4 + 98.0, // 4,102 + -102.0 // 102,0 + }); + } + + /* test Aggregations.sum() */ + @Test + public void testSum() throws Exception { + Double act = Aggregations.sum(Arrays.asList(1, 2, 3.8)); // check with mixed Number + Double exp = 1 + 2 + 3.8; + assertEquals(exp, act, 0.01); + + act = Aggregations.sum(Arrays.asList()); + exp = 0.0; + assertEquals(exp, act, 0.01); + } + + /* test Aggregations.sumInts() */ + @Test + public void testSumInts() throws Exception { + long act = Aggregations.sumInts(Arrays.asList(1, 2, 3)); + long exp = 1 + 2 + 3; + assertEquals(exp, act); + + act = Aggregations.sumInts(Arrays.asList()); + exp = 0; + assertEquals(exp, act); + } + + /* util to test Aggregations.aggregate(List<Number>, stat) */ + protected void aggregate(UnivariateAggregate stat, Double[] expResults) throws Exception { + + // for "continuous last-2" style inputs of (1, 4, 102, 0) + + int i = 0; + Double result; + + result = Aggregations.aggregate(Arrays.asList(1), stat); + assertResult(i, stat, expResults[i++], result); + + result = Aggregations.aggregate(Arrays.asList(1,4), stat); + assertResult(i, stat, expResults[i++], result); + + result = Aggregations.aggregate(Arrays.asList(4,102), stat); + assertResult(i, stat, expResults[i++], result); + + result = Aggregations.aggregate(Arrays.asList(102,0), stat); + assertResult(i, stat, expResults[i++], result); + } + + /* util to test Aggregations.aggregate(List<T>, getter, stat) */ + protected void aggregateT(UnivariateAggregate stat, Double[] expResults) throws Exception { + + // for "continuous last-2" style inputs of (1, 4, 102, 0) + + int i = 0; + Double result; + List<Object> values; + + values = Arrays.asList(1); + result = Aggregations.aggregate(values, o -> Double.valueOf(o.toString()), stat); + assertResult(i, stat, expResults[i++], result); + + values = Arrays.asList(1,4); + result = Aggregations.aggregate(values, o -> Double.valueOf(o.toString()), stat); + assertResult(i, stat, expResults[i++], result); + + values = Arrays.asList(4,102); + result = Aggregations.aggregate(values, o -> Double.valueOf(o.toString()), stat); + assertResult(i, stat, expResults[i++], result); + + values = Arrays.asList(102,0); + result = Aggregations.aggregate(values, o -> Double.valueOf(o.toString()), stat); + assertResult(i, stat, expResults[i++], result); + } + + /* util to test Aggregations.aggregateN(List<Number>, stats) */ + protected void aggregateN(UnivariateAggregate[] stats) throws Exception { + + // for "continuous last-2" style inputs of (1, 4, 102, 0) + + int i = 0; + ResultMap result; + + result = Aggregations.aggregateN(Arrays.asList(1), stats); + assertResult(i++, stats, STAT_RESULTS, result); + + result = Aggregations.aggregateN(Arrays.asList(1,4), stats); + assertResult(i++, stats, STAT_RESULTS, result); + + result = Aggregations.aggregateN(Arrays.asList(4,102), stats); + assertResult(i++, stats, STAT_RESULTS, result); + + result = Aggregations.aggregateN(Arrays.asList(102,0), stats); + assertResult(i++, stats, STAT_RESULTS, result); + } + + /* util to test Aggregations.aggregateN(List<T>, getter, stats) */ + protected void aggregateNT(UnivariateAggregate[] stats) throws Exception { + + // for "continuous last-2" style inputs of (1, 4, 102, 0) + + int i = 0; + ResultMap result; + List<Object> values; + + values = Arrays.asList(1); + result = Aggregations.aggregateN(values, o -> Double.valueOf(o.toString()), stats); + assertResult(i++, stats, STAT_RESULTS, result); + + values = Arrays.asList(1,4); + result = Aggregations.aggregateN(values, o -> Double.valueOf(o.toString()), stats); + assertResult(i++, stats, STAT_RESULTS, result); + + values = Arrays.asList(4,102); + result = Aggregations.aggregateN(values, o -> Double.valueOf(o.toString()), stats); + assertResult(i++, stats, STAT_RESULTS, result); + + values = Arrays.asList(102,0); + result = Aggregations.aggregateN(values, o -> Double.valueOf(o.toString()), stats); + assertResult(i++, stats, STAT_RESULTS, result); + } + + protected static void assertResult(int index, UnivariateAggregate stat, Double exp, Double act) { + if (exp == null && act == null) + return; // match + assertFalse("index="+index+" "+stat.name()+" exp="+exp+" act="+act, exp == null || act == null); + assertEquals("index="+index+" "+stat.name(), exp, act, 0.01); + } + + protected static void assertResult(int i, UnivariateAggregate[] stats, Map<UnivariateAggregate,Double[]> expResults, ResultMap actResults) { + for (UnivariateAggregate stat : stats) { + Double act = actResults.get(stat); + Double exp = expResults.get(stat)[i]; + + assertResult(i, stat, exp, act); + } + } + + /* util to test Aggregations.aggregate(List<Number>, stat) */ + protected void testAggregate(UnivariateAggregate stat) throws Exception { + aggregate(stat, STAT_RESULTS.get(stat)); + } + + @Test + public void testCOUNT() throws Exception { + testAggregate(Statistic2.COUNT); + } + + @Test + public void testMIN() throws Exception { + testAggregate(Statistic2.MIN); + } + + @Test + public void testMAX() throws Exception { + testAggregate(Statistic2.MAX); + } + + @Test + public void testSUM() throws Exception { + testAggregate(Statistic2.SUM); + } + + @Test + public void testMEAN() throws Exception { + testAggregate(Statistic2.MEAN); + } + + @Test + public void testSTDDEV() throws Exception { + testAggregate(Statistic2.STDDEV); + } + + @Test + public void testSLOPE() throws Exception { + testAggregate(Regression2.SLOPE); + } + + @Test + public void testEmptyAggregate() throws Exception { + Double result; + + result = Aggregations.aggregate(Collections.emptyList(), Statistic2.COUNT); + assertResult(0, Statistic2.COUNT, null, result); + + result = Aggregations.aggregate(Collections.emptyList(), Statistic2.MIN); + assertResult(0, Statistic2.MIN, null, result); + + // ... + + ResultMap resultMap = Aggregations.aggregateN(Collections.emptyList(), Statistic2.MIN); + assertTrue(resultMap.isEmpty()); + + resultMap = Aggregations.aggregateN(Arrays.asList(1)); + assertTrue(resultMap.isEmpty()); + + resultMap = Aggregations.aggregateN(Arrays.asList(1), new UnivariateAggregate[0]); + assertTrue(resultMap.isEmpty()); + } + + @Test + public void testAggregateT() throws Exception { + aggregateT(Statistic2.MIN, STAT_RESULTS.get(Statistic2.MIN)); + } + + /* test Aggregations.aggregateN(List<Number>, stats) */ + @Test + public void testAggregateN() throws Exception { + aggregateN(STAT_RESULTS.keySet().toArray(new UnivariateAggregate[0])); + } + + /* test Aggregations.aggregateN(List<T>, getter, stats) */ + @Test + public void testAggregateNT() throws Exception { + aggregateNT(STAT_RESULTS.keySet().toArray(new UnivariateAggregate[0])); + } + + /* test Aggregations.newResultToJson() */ + @Test + public void testNewResultsToJson() throws Exception { + ResultMap result = Aggregations.newResults(); + result.put(Statistic2.MIN, 2.5); + result.put(Statistic2.MAX, 4.5); + + JsonObject jo = Aggregations.newResultsToJson().apply(result); + + assertTrue(jo.get(Statistic2.MIN.name()) != null); + assertEquals(jo.get(Statistic2.MIN.name()).getAsDouble(), 2.5, 0.01); + assertTrue(jo.get(Statistic2.MAX.name()) != null); + assertEquals(jo.get(Statistic2.MAX.name()).getAsDouble(), 4.5, 0.01); + } + + /* test Aggregations.newMvResultToJson() */ + @Test + public void testNewMvResultsToJson() throws Exception { + ResultMap var1result = Aggregations.newResults(); + var1result.put(Statistic2.MIN, 2.5); + var1result.put(Statistic2.MAX, 4.5); + + ResultMap var2result = Aggregations.newResults(); + var2result.put(Statistic2.SUM, 27.1); + + MvResultMap result = Aggregations.newMvResults(); + result.put("var1", var1result); + result.put("var2", var2result); + + JsonObject jo = Aggregations.newMvResultsToJson().apply(result); + + assertTrue(jo.get("var1") != null); + JsonObject joVar1 = jo.get("var1").getAsJsonObject(); + assertTrue(joVar1.get(Statistic2.MIN.name()) != null); + assertEquals(joVar1.get(Statistic2.MIN.name()).getAsDouble(), 2.5, 0.01); + assertTrue(joVar1.get(Statistic2.MAX.name()) != null); + assertEquals(joVar1.get(Statistic2.MAX.name()).getAsDouble(), 4.5, 0.01); + + assertTrue(jo.get("var2") != null); + JsonObject joVar2 = jo.get("var2").getAsJsonObject(); + assertTrue(joVar2.get(Statistic2.SUM.name()) != null); + assertEquals(joVar2.get(Statistic2.SUM.name()).getAsDouble(), 27.1, 0.01); + } + + /* test Aggregations.aggregateN(list, stats) in a Stream/Window context */ + @Test + public void testAggregateNStream() throws Exception { + // Aggregations.aggregate are ignorant of Stream/Window but + // to be safe verify a full Stream/Window based use case works + + UnivariateAggregate[] stats = STAT_RESULTS.keySet().toArray(new UnivariateAggregate[0]); + + Topology topology = newTopology("testAggregateNStream"); + + // (1, 4, 102, 0) + TStream<Integer> sourceData = sourceData(topology); + + TWindow<Integer, Integer> window = sourceData.last(2, Functions.unpartitioned()); + + TStream<ResultMap> aggregate = window.aggregate( (list,partition) -> { + return Aggregations.aggregateN(list, stats); + }); + + Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 4); + Condition<List<ResultMap>> contents = topology.getTester().streamContents(aggregate); + complete(topology, count); + assertTrue(count.valid()); + + List<ResultMap> tuples = contents.getResult(); + assertEquals(4, tuples.size()); + + for (int i = 0; i < tuples.size(); i++) { + assertResult(i, stats, STAT_RESULTS, tuples.get(i)); + } + } + + /* test Aggregations.aggregateN(list, stats) in a multivariable Stream/Window context */ + @Test + public void testMvAggregateNStream() throws Exception { + // Aggregations.aggregate are ignorant of Stream/Window but + // to be safe verify a full Stream/Window based use case works (TStream<MvResultMap>) + + UnivariateAggregate[] stats = { Statistic2.MIN, Statistic2.MAX }; + + Map<UnivariateAggregate,Double[]> var2_STAT_RESULTS = new HashMap<>(); + var2_STAT_RESULTS.put(Statistic2.MIN, new Double[] {1001.0, 1001.0, 1004.0, 1000.0}); + var2_STAT_RESULTS.put(Statistic2.MAX, new Double[] {1001.0, 1004.0, 1102.0, 1102.0}); + + + Topology topology = newTopology("testAggregateNStream"); + + // (1, 4, 102, 0) + TStream<SensorReadings> sourceData = sourceData(topology) + .map(i -> new SensorReadings(i, i+1000)); + + TWindow<SensorReadings, Integer> window = sourceData.last(2, Functions.unpartitioned()); + + TStream<MvResultMap> aggregate = window.aggregate( (list,partition) -> { + ResultMap var1result = Aggregations.aggregateN(list, sr -> sr.var1, stats); + ResultMap var2result = Aggregations.aggregateN(list, sr -> sr.var2, stats); + MvResultMap result = Aggregations.newMvResults(); + result.put("var1", var1result); + result.put("var2", var2result); + return result; + }); + + Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 4); + Condition<List<MvResultMap>> contents = topology.getTester().streamContents(aggregate); + complete(topology, count); + assertTrue(count.valid()); + + List<MvResultMap> tuples = contents.getResult(); + assertEquals(4, tuples.size()); + + for (int i = 0; i < tuples.size(); i++) { + MvResultMap results = tuples.get(i); + assertResult(i, stats, STAT_RESULTS, results.get("var1")); + assertResult(i, stats, var2_STAT_RESULTS, results.get("var2")); + } + } + + protected static class SensorReadings { + int var1; + int var2; + SensorReadings(int var1, int var2) { + this.var1 = var1; + this.var2 = var2; + } + } + + protected static TStream<Integer> sourceData(Topology topology) + { + return topology.of(1, 4, 102, 0); + } + +}
