Repository: incubator-edgent
Updated Branches:
  refs/heads/master 9e4934ea4 -> ee9ac2845


[Edgent-409] add analytics.math3.Aggregations

- analytics.math3
  - add Aggregations
  - add UnivariateAggregate (general math3.json.JsonUniv...)
  - add UnivariateAggregator (general math3.json.JsonUniv...)
  - add package-info
- analytics.math3.stats - add tuple-type-independent stats
  - add Statistics2 (general Statistics)
  - add Regression2 (general Regression)
  - add StorelessStatistic (general JsonStorelessStatistic)
  - add package scoped Count
  - add package scoped OLS (general JsonOLS)
- add Statistics2Test

Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/99e9f151
Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/99e9f151
Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/99e9f151

Branch: refs/heads/master
Commit: 99e9f1519ddf900c996449d74c3867db099dbbce
Parents: 9e4934e
Author: Dale LaBossiere <[email protected]>
Authored: Thu Apr 20 11:35:41 2017 -0400
Committer: Dale LaBossiere <[email protected]>
Committed: Thu Apr 20 11:39:57 2017 -0400

----------------------------------------------------------------------
 .../edgent/analytics/math3/Aggregations.java    | 313 +++++++++++++
 .../analytics/math3/UnivariateAggregate.java    |  48 ++
 .../analytics/math3/UnivariateAggregator.java   |  47 ++
 .../analytics/math3/json/JsonAnalytics.java     |   2 +
 .../edgent/analytics/math3/package-info.java    |  24 +
 .../edgent/analytics/math3/stat/Count.java      |  40 ++
 .../apache/edgent/analytics/math3/stat/OLS.java |  80 ++++
 .../analytics/math3/stat/Regression2.java       |  56 +++
 .../edgent/analytics/math3/stat/Statistic2.java |  94 ++++
 .../math3/stat/StorelessStatistic.java          |  58 +++
 .../test/analytics/math3/Statistics2Test.java   | 460 +++++++++++++++++++
 11 files changed, 1222 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/Aggregations.java
----------------------------------------------------------------------
diff --git 
a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/Aggregations.java
 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/Aggregations.java
new file mode 100644
index 0000000..4f037ad
--- /dev/null
+++ 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/Aggregations.java
@@ -0,0 +1,313 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3;
+
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.edgent.analytics.math3.json.JsonAnalytics;
+import org.apache.edgent.analytics.math3.stat.Regression2;
+import org.apache.edgent.analytics.math3.stat.Statistic2;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.ToDoubleFunction;
+import org.apache.edgent.topology.TWindow;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+/**
+ * Apache Common Math analytics for Collections.
+ * 
+ * <p>These operations are typically used when processing a collection of 
tuples
+ * in a non-partitioned or partitioned {@link TWindow}.
+ * 
+ * <p>Simple sum aggregation functions are also provided (helpful given the 
avoidance
+ * of depending on Java8 Streams).
+ * 
+ * <p>Example: compute a batched MEAN aggregation for a simple unpartitioned 
window of
+ * numeric {@code TStream<Double>} where the desired result is a {@code 
TStream<Double>} of
+ * the MEAN values:
+ * <pre>{@code
+ *  TStream<Double> pressureReadings = ...
+ *TWindow<Double,Integer> window = pressureReadings.last(5, 
Functions.unpartitioned());
+ *TStream<Double> meanPressureReadings = window.batch(
+ *      (list, partition) -> Aggregations.aggregate(list, Statistic2.MEAN));
+ * }</pre>
+ * 
+ * <p>Example: compute the MEAN and SLOPE, capturing the results in a {@link 
ResultMap}:
+ * <pre>{@code
+ *  TStream<Double> pressureReadings = ...
+ *TWindow<Double,Integer> window = pressureReadings.last(5, 
Functions.unpartitioned());
+ *TStream<ResultMap> meanPressureReadings = window.batch(
+ *      (list, partition) -> Aggregations.aggregateN(list, Statistic2.MEAN, 
Regression2.SLOPE));
+ * }</pre>
+ * 
+ * <p>Example: compute multiple aggregations on multiple variables in a tuple 
T,
+ * capturing the results in a {@link MvResultMap}:
+ * <pre>{@code
+ *class SensorReading { 
+ *  double getTemp() {...&#125;;
+ *  double getPressure() {...&#125;;
+ *  ... &#125;;
+ *TStream&lt;SensorReading> readings = ...
+ *TWindow&lt;SensorReading,Integer> window = sensorReadings.last(5, 
Functions.unpartitioned());
+ *TStream&lt;MvResultMap> aggregations = window.batch(
+ *      (list, partition) -> {
+ *        ResultMap pressureResults = Aggregations.aggregateN(list, t -> 
t.getPressure(), Statistic2.MEAN, Regression2.SLOPE));
+ *        ResultMap tempResults = Aggregations.aggregateN(list, t -> 
t.getTemp(), Statistic2.MAX));
+ *        MvResultMap results = Aggregations.newMvResults();
+ *        results.put("pressure", pressureResults);
+ *        results.put("temp", tempResults);
+ *        return results;
+ *      &#125;);
+ * }</pre>
+ *
+ * <p>Example: convert a {@code TStream<ResultMap>} or {@code 
TStream<MvResultMap>} to a JsonObject:
+ * 
+ * <pre>{@code
+ *   TStream<ResultMap> resultMap = ...
+ * TStream<JsonObject> joResultMap = 
resultMap.map(Aggregations.newResultsToJsonFn());
+ * }</pre>
+ * 
+ * 
+ * <p>Background: 
+ * {@link JsonAnalytics} predates this class.  Use of JsonAnalytics for 
computing
+ * Apache Commons Math aggregations requires:
+ * <ul>
+ * <li>the input stream tuple type must be a JsonObject and the user must 
identify
+ *     the property that contains the value to be aggregated.</li>
+ * <li>batched window aggregations are not supported (they could be 
added).</li>
+ * <li>the aggregation result tuple is a JsonObject that holds a user-specified
+ *     property for a window partition key and a user-specified property
+ *     for a JsonObject holding the UnivariateAggregation results by name 
(e.g., "MEAN").
+ *     Accessing a particular aggregation results in the JsonObject
+ *     is somewhat cumbersome.</li>
+ * </ul>
+ * 
+ * <p>As a result, using JsonAnalytics for simple cases can be a bit 
unintuitive and cumbersome.  
+ * 
+ * <p>For example, to JsonAnalytics for a simple case of a continuous 
aggregation
+ * of {@code TStream<Double>} => {@code TStream<Double>} of MEAN values:
+ * 
+ * <pre>{@code
+ *  TStream<Double> pressureReadings = ...
+ * 
+ * // transform TStream<Double> to TStream<JsonObject>
+ * TStream<JsonObject> joPressureReadings = 
pressureReadings.map(JsonFunctions.valueOfDouble("pressure"));
+ * 
+ * // aggregate
+ * TWindow<JsonObject,JsonElement> window = joPressureReadings.last(5, 
JsonFunctions.unpartitioned());
+ * TStream<JsonObject> results = JsonAnalytics.aggregate(window, "partition", 
"pressure", Statistic.MEAN);
+ *  
+ * // transform to TStream<Double> mean results
+ * TStream<Double> meanPressureReadings = results.map(jo -> 
jo.get("pressure").getAsObject().get("MEAN").getAsDouble());
+ * }</pre>
+ * 
+ * <p>Using Aggregations it's simply:
+ * 
+ * <pre>{@code
+ *TWindow<Double,Integer> window = pressureReadings.last(5, 
Functions.unpartitioned());
+ *TStream<Double> meanPressureReadings = window.aggregate(
+ *      (list, partition) -> Aggregations.aggregate(list, Statistic2.MEAN));
+ * }</pre>
+ * 
+ * 
+ * @see Statistic2
+ * @see Regression2
+ * @see JsonAnalytics
+ */
+public class Aggregations {
+  
+  /** 
+   * Perform a sum of numbers treated as double values, accumulating in a 
double.
+   * An empty list yields a 0.0 result.
+   * @param list numbers to sum
+   * @return the sum
+   */
+  public static Double sum(Collection<? extends Number> list) {
+    double sum = 0.0;
+    for (Number v : list) 
+      sum += v.doubleValue();
+    return sum;
+  }
+  
+  /** 
+   * Perform a sum of numbers treated as long values, accumulating in a long.
+   * More efficient than sum() for non-floating point values.
+   * An empty list yields a 0 result.
+   * @param list numbers to sum
+   * @return the sum
+   */
+  public static long sumInts(Collection<? extends Number> list) {
+    long sum = 0;
+    for (Number v : list) 
+      sum += v.longValue();
+    return sum;
+  }
+  
+  /**
+   * Aggregation results for a single aggregated variable.
+   */
+  public static class ResultMap extends HashMap<UnivariateAggregate,Double> {
+    private static final long serialVersionUID = 1L;
+  }
+  
+  /**
+   * Create a new empty {@link ResultMap}.
+   * @return the ResultMap.
+   */
+  public static ResultMap newResults() { return new ResultMap(); }
+
+  /**
+   * Create a {@link Function} whose {@code apply(ResultMap)} converts the 
value
+   * to a {@code JsonObject}.  The ResultMap's key's names are the JsonObject 
property
+   * names and the property value is the key's value.
+   * 
+   * <p>An example resulting JsonObject would be <pre>{ "MEAN":3.75, "MIN":2.0 
}</pre>.
+   * @return the JsonObject
+   */
+  public static Function<ResultMap,JsonObject> newResultsToJson() {
+    Gson gson = new Gson();
+    return (ResultMap resultMap) -> 
gson.toJsonTree(resultMap).getAsJsonObject();
+  }
+
+  /**
+   * Aggregation results for a multiple aggregated variables.
+   * <p>The name of the aggregated variable is typically the key for the 
variable's {@link ResultMap}.
+   */
+  public static class MvResultMap extends HashMap<String,ResultMap> {
+    private static final long serialVersionUID = 1L;
+  };
+  
+  /**
+   * Create a new empty {@link MvResultMap}.
+   * @return the MvResultMap.
+   */
+  public static MvResultMap newMvResults() { return new MvResultMap(); }
+
+  /**
+   * Create a {@link Function} whose {@code apply(MvResultMap)} converts the 
value
+   * to a {@code JsonObject}.  The MvResultMap's key's names are the 
JsonObject property
+   * names and the property value is the JsonObject for the key's ResultMap 
value.
+   * 
+   * <p>An example resulting JsonObject would be 
+   * <pre>{ "temperature":{"MEAN":123.75, "MAX":180.5}, 
"pressure":{"MAX":13.0} }</pre>.
+   * 
+   * @return the JsonObject
+   */
+  public static Function<MvResultMap,JsonObject> newMvResultsToJson() {
+    Gson gson = new Gson();
+    return (MvResultMap mvResultMap) -> 
gson.toJsonTree(mvResultMap).getAsJsonObject();
+  }
+
+  /**
+   * Perform the specified {@link UnivariateAggregate} on a Collection of 
{@link Number}.
+   * 
+   * <p>A null result is returned if the collection is empty.
+   * An aggregation result may be null under other conditions,
+   * e.g., a Regression2.SLOPE where the minimum number of samples has not 
been met.
+   * 
+   * @param c the Collection to aggregate
+   * @param aggregate the aggregation to perform
+   * @return the aggregation result, may be null.
+   */
+  public static Double aggregate(Collection<? extends Number> c, 
UnivariateAggregate aggregate) {
+    return aggregateN(c, aggregate).get(aggregate);
+  }
+  
+  /**
+   * Perform the specified {@link UnivariateAggregate} on a Collection of 
{@link Number}.
+   *  
+   * <p>If the collection is empty an empty ResultMap is returned.
+   * The ResultMap does not contain an entry for an aggregation with a null,
+   * e.g., a Regression2.SLOPE where the minimum number of samples has not 
been met.
+   * 
+   * @param c the Collection to aggregate
+   * @param aggregates the aggregations to perform
+   * @return a {@link ResultMap} containing the variable's aggregation results
+   */
+  public static ResultMap aggregateN(Collection<? extends Number> c, 
UnivariateAggregate... aggregates) {
+    return aggregateN(c, num -> num.doubleValue(), aggregates);
+  }
+  
+  /**
+   * Perform the specified {@link UnivariateAggregate} a Collection of {@code 
T}
+   * using the specified {@link ToDoubleFunction getter} to extract the
+   * variable to aggregate.
+   * 
+   * <p>A null result is returned if the collection is empty.
+   * An aggregation result may be null under other conditions,
+   * e.g., a Regression2.SLOPE where the minimum number of samples has not 
been met.
+   * 
+   * @param c the Collection to aggregate
+   * @param getter function that returns the variable to aggregate from a 
{@code T}
+   * @param aggregate the aggregation to perform
+   * @return the aggregation result, may be null.
+   */
+  public static <T> Double aggregate(Collection<T> c, ToDoubleFunction<T> 
getter, UnivariateAggregate aggregate) {
+    return aggregateN(c, getter, aggregate).get(aggregate);
+  }
+
+  /**
+   * Perform the specified {@link UnivariateAggregate} a Collection of {@code 
T}
+   * using the specified {@link ToDoubleFunction getter} to extract the
+   * variable to aggregate.
+   * 
+   * <p>If the collection is empty an empty ResultMap is returned.
+   * The ResultMap does not contain an entry for an aggregation with a null,
+   * e.g., a Regression2.SLOPE where the minimum number of samples has not 
been met.
+   * 
+   * @param c the Collection to aggregate
+   * @param getter function that returns the variable to aggregate from a 
{@code T}
+   * @param aggregates the aggregations to perform
+   * @return a {@link ResultMap} containing the variable's aggregation results
+   */
+  public static <T> ResultMap aggregateN(Collection<T> c, ToDoubleFunction<T> 
getter, UnivariateAggregate... aggregates) {
+
+    final int n = c.size();
+    final ResultMap result = newResults();
+    
+    if (n != 0) {
+      // get new UnivariateAggregate instances for this aggregation
+      final UnivariateAggregator[] aggregators = new 
UnivariateAggregator[aggregates.length];
+      for (int i = 0; i < aggregates.length; i++) {
+          aggregators[i] = aggregates[i].get();
+      }
+      for (UnivariateAggregator agg : aggregators) {
+          agg.clear(n);
+      }
+      for (T value : c) {
+          Double d = getter.applyAsDouble(value);
+          for (UnivariateAggregator agg : aggregators) {
+              agg.increment(d);
+          }
+      }
+      for (UnivariateAggregator agg : aggregators) {
+          // do as JsonAnalytics did and omit Nan/Inf results from the map.
+          double rv = agg.getResult();
+          
+          if (Double.isFinite(rv))
+              result.put(agg.getAggregate(), Double.valueOf(rv));
+      }
+    }
+
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregate.java
----------------------------------------------------------------------
diff --git 
a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregate.java
 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregate.java
new file mode 100644
index 0000000..903ee7f
--- /dev/null
+++ 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregate.java
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3;
+
+import org.apache.edgent.function.Supplier;
+
+/**
+ * Univariate aggregate for a tuple.
+ * This is the declaration of the aggregate that
+ * application use when declaring a topology.
+ * <P>
+ * Implementations are typically enums such
+ * as {@link org.apache.edgent.analytics.math3.stat.Statistic2 Statistic2}.
+ * </P>
+ * <P>
+ * Each call to {@code get()} must return a new
+ * {@link UnivariateAggregator aggregator}
+ * that implements the required aggregate.
+ * </P>
+ * 
+ * @see Aggregations
+ */
+public interface UnivariateAggregate extends Supplier<UnivariateAggregator>{
+    
+    /**
+     * Name of the aggregate.
+     * The returned value is used as the JSON key containing
+     * the result of the aggregation.
+     * @return Name of the aggregate.
+     */
+    public String name();
+}

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregator.java
----------------------------------------------------------------------
diff --git 
a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregator.java
 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregator.java
new file mode 100644
index 0000000..52a1730
--- /dev/null
+++ 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/UnivariateAggregator.java
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3;
+
+/**
+ * Univariate aggregator for tuples.
+ * This is the runtime implementation interface
+ * of {@link UnivariateAggregate} defined aggregate.
+ */
+public interface UnivariateAggregator {
+  
+    UnivariateAggregate getAggregate();
+    
+    /**
+     * Clear the aggregator to prepare for a new aggregation.
+     * @param n Number of tuples to be aggregated.
+     */
+    void clear(int n);
+    
+    /**
+     * Add a value to the aggregation. 
+     * @param value Value to be added.
+     */
+    void increment(double value);
+    
+    /**
+     * Get the aggregation result.  It may be a NaN (empty collection, etc) or 
Infinite value.
+     * @return the result
+     */
+    double getResult();
+}

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java
----------------------------------------------------------------------
diff --git 
a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java
 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java
index c6e9108..cfe1a78 100644
--- 
a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java
+++ 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java
@@ -21,6 +21,7 @@ package org.apache.edgent.analytics.math3.json;
 import java.util.List;
 
 import org.apache.commons.math3.util.Pair;
+import org.apache.edgent.analytics.math3.Aggregations;
 import org.apache.edgent.function.BiFunction;
 import org.apache.edgent.function.ToDoubleFunction;
 import org.apache.edgent.topology.TStream;
@@ -32,6 +33,7 @@ import com.google.gson.JsonObject;
 /**
  * Apache Common Math analytics for streams with JSON tuples.
  *
+ * @see Aggregations
  */
 public class JsonAnalytics {
     

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/package-info.java
----------------------------------------------------------------------
diff --git 
a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/package-info.java
 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/package-info.java
new file mode 100644
index 0000000..e31b29b
--- /dev/null
+++ 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Analytics using Apache Commons Math.
+ */
+package org.apache.edgent.analytics.math3;
+

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Count.java
----------------------------------------------------------------------
diff --git 
a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Count.java
 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Count.java
new file mode 100644
index 0000000..102f507
--- /dev/null
+++ 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Count.java
@@ -0,0 +1,40 @@
+package org.apache.edgent.analytics.math3.stat;
+
+import 
org.apache.commons.math3.stat.descriptive.AbstractStorelessUnivariateStatistic;
+import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
+
+/**
+ * The number of items in the collection being aggregated.
+ * 
+ * Need this semi-hack to be able to capture the number
+ * of items in an aggregation in a ResultMap.
+ */
+class Count extends AbstractStorelessUnivariateStatistic {
+  int n;
+
+  @Override
+  public long getN() {
+    return n;
+  }
+
+  @Override
+  public void clear() {
+    n = 0;
+  }
+
+  @Override
+  public StorelessUnivariateStatistic copy() {
+    return new Count();
+  }
+
+  @Override
+  public double getResult() {
+    return n;
+  }
+
+  @Override
+  public void increment(double arg0) {
+    n++;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/OLS.java
----------------------------------------------------------------------
diff --git 
a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/OLS.java 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/OLS.java
new file mode 100644
index 0000000..c798101
--- /dev/null
+++ 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/OLS.java
@@ -0,0 +1,80 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.stat;
+
+import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression;
+import org.apache.edgent.analytics.math3.UnivariateAggregate;
+import org.apache.edgent.analytics.math3.UnivariateAggregator;
+
+/**
+ * Wrapper over commons math3 {@code OLSMultipleLinearRegression}
+ */
+class OLS implements UnivariateAggregator {
+    
+    private final Regression2 type;
+    private final OLSMultipleLinearRegression ols = new 
OLSMultipleLinearRegression();
+    private double[] values;
+    private int yOffset;
+    
+    OLS(Regression2 type) {
+        this.type = type;
+    }
+
+    @Override
+    public UnivariateAggregate getAggregate() {
+      return type;
+    }
+
+    @Override
+    public void clear(int n) {
+        values = new double[n*2];
+        yOffset = 0;
+    }
+
+    @Override
+    public void increment(double v) {  
+        values[yOffset] = v;
+        yOffset+=2;
+    }
+    
+    void setSampleData() {
+        // Fill  in the x values
+        for (int x = 0; x < values.length/2; x++)
+            values[(x*2)+1] = x;
+        ols.newSampleData(values, values.length/2, 1);
+    }
+    
+    @Override
+    public double getResult() {
+        // If there are no values or only a single
+        // value then we cannot calculate the slope.
+        if (values.length <= 2)
+            return Double.NaN;
+            
+        setSampleData();
+        double[] regressionParams = ols.estimateRegressionParameters();
+        if (regressionParams.length >= 2) {
+            // [0] is the constant (zero'th order)
+            // [1] is the first order , which we use as the slope.
+            final double slope = regressionParams[1];
+            return slope;
+        }
+        return Double.NaN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression2.java
----------------------------------------------------------------------
diff --git 
a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression2.java
 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression2.java
new file mode 100644
index 0000000..4524e95
--- /dev/null
+++ 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression2.java
@@ -0,0 +1,56 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.stat;
+
+import org.apache.edgent.analytics.math3.UnivariateAggregate;
+import org.apache.edgent.analytics.math3.UnivariateAggregator;
+
+/**
+ * Univariate regression aggregates.
+ *
+ * Unfortunately the implicitly-Json Regression wasn't named JsonRegresson.
+ */
+public enum Regression2 implements UnivariateAggregate {
+    
+    /**
+     * Calculate the slope of a single variable.
+     * The slope is calculated using the first
+     * order of a ordinary least squares
+     * linear regression.
+     * The list of values for the single
+     * single variable are processed as Y-values
+     * that are evenly spaced on the X-axis.
+     * <BR>
+     * This is useful as a simple determination
+     * if the variable is increasing or decreasing.
+     * <BR>
+     * The slope value is represented as a {@code double}
+     * with the key {@code SLOPE} in the aggregate result.
+     * <BR>
+     * If the collection to be aggregated contains less than
+     * two values then no regression is performed 
+     * (an associated UnivariateAggregator.getResult() returns NaN).
+     */
+    SLOPE() {
+        @Override
+        public UnivariateAggregator get() {
+            return new OLS(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic2.java
----------------------------------------------------------------------
diff --git 
a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic2.java
 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic2.java
new file mode 100644
index 0000000..d731dac
--- /dev/null
+++ 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic2.java
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.stat;
+
+import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
+import org.apache.commons.math3.stat.descriptive.moment.Mean;
+import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
+import org.apache.commons.math3.stat.descriptive.rank.Max;
+import org.apache.commons.math3.stat.descriptive.rank.Min;
+import org.apache.commons.math3.stat.descriptive.summary.Sum;
+import org.apache.edgent.analytics.math3.Aggregations;
+import org.apache.edgent.analytics.math3.UnivariateAggregate;
+import org.apache.edgent.analytics.math3.UnivariateAggregator;
+
+/**
+ * Statistic implementations.
+ * 
+ * Univariate statistic aggregate calculations against a value
+ * extracted from a tuple.
+ * 
+ * Unfortunately the implicitly-Json Statistic wasn't named JsonStatistic.
+ * 
+ * @see Aggregations
+ */
+public enum Statistic2 implements UnivariateAggregate {
+    
+    /**
+     * Calculate the number of elements in the collection.
+     * The value is represented as a {@code double}
+     * with the key {@code COUNT} in the aggregate result.
+     */
+    COUNT(new Count()),
+    /**
+     * Calculate the arithmetic mean.
+     * The mean value is represented as a {@code double}
+     * with the key {@code MEAN} in the aggregate result.
+     */
+    MEAN(new Mean()),
+    /**
+     * Calculate the minimum.
+     * The minimum value is represented as a {@code double}
+     * with the key {@code MIN} in the aggregate result.
+     */
+    MIN(new Min()),
+    /**
+     * Calculate the maximum.
+     * The maximum value is represented as a {@code double}
+     * with the key {@code MAXIMUM} in the aggregate result.
+     */
+    MAX(new Max()),
+    /**
+     * Calculate the sum.
+     * The sum is represented as a {@code double}
+     * with the key {@code SUM} in the aggregate result.
+     */
+    SUM(new Sum()),
+    /**
+     * Calculate the standard deviation.
+     */
+    STDDEV(new StandardDeviation());
+
+    private final StorelessUnivariateStatistic statImpl;
+
+    private Statistic2(StorelessUnivariateStatistic statImpl) {
+        this.statImpl = statImpl;
+        statImpl.clear();
+    }
+
+    /**
+     * Return a new instance of this statistic implementation.
+     * N.B. must call clear(n) before using the result.
+     * @return A new instance of this statistic implementation.
+     */
+    @Override
+    public UnivariateAggregator get() {
+        return new StorelessStatistic(this, statImpl.copy());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/StorelessStatistic.java
----------------------------------------------------------------------
diff --git 
a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/StorelessStatistic.java
 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/StorelessStatistic.java
new file mode 100644
index 0000000..a0e9bc6
--- /dev/null
+++ 
b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/StorelessStatistic.java
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.stat;
+
+import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
+import org.apache.edgent.analytics.math3.UnivariateAggregate;
+import org.apache.edgent.analytics.math3.UnivariateAggregator;
+
+/**
+ * Univariate aggregator implementation wrapping a {@code 
StorelessUnivariateStatistic}.
+ */
+public class StorelessStatistic implements UnivariateAggregator {
+        
+    private final Statistic2 stat;
+    private final StorelessUnivariateStatistic statImpl;
+    
+    public StorelessStatistic(Statistic2 stat, StorelessUnivariateStatistic 
statImpl) {
+        this.stat = stat;
+        this.statImpl = statImpl;
+    }
+
+    @Override
+    public UnivariateAggregate getAggregate() {
+      return stat;
+    }
+
+    @Override
+    public void clear(int n) {
+        statImpl.clear();
+    }
+
+    @Override
+    public void increment(double v) {
+        statImpl.increment(v);
+    }
+    
+    @Override
+    public double getResult() {
+      return statImpl.getResult();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/99e9f151/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/Statistics2Test.java
----------------------------------------------------------------------
diff --git 
a/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/Statistics2Test.java
 
b/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/Statistics2Test.java
new file mode 100644
index 0000000..6e463a2
--- /dev/null
+++ 
b/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/Statistics2Test.java
@@ -0,0 +1,460 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.analytics.math3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.edgent.analytics.math3.Aggregations;
+import org.apache.edgent.analytics.math3.Aggregations.MvResultMap;
+import org.apache.edgent.analytics.math3.Aggregations.ResultMap;
+import org.apache.edgent.analytics.math3.UnivariateAggregate;
+import org.apache.edgent.analytics.math3.stat.Regression2;
+import org.apache.edgent.analytics.math3.stat.Statistic2;
+import org.apache.edgent.function.Functions;
+import org.apache.edgent.test.providers.direct.DirectTestSetup;
+import org.apache.edgent.test.topology.TopologyAbstractTest;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TWindow;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+/** test Aggregations, Statistics2 and Regression2 */
+public class Statistics2Test  extends TopologyAbstractTest implements 
DirectTestSetup {
+  
+  // Expected results for **continuous last-2** aggregations of (1, 4, 102, 0) 
for the various stats.
+  protected static Map<UnivariateAggregate,Double[]> STAT_RESULTS = new 
HashMap<>();
+  static {
+    STAT_RESULTS.put(
+        Statistic2.COUNT,
+        new Double[] {
+            1.0,  // 1
+            2.0,  // 1,4  
+            2.0,  // 4,102
+            2.0   // 102,0
+        });
+    STAT_RESULTS.put(
+        Statistic2.MIN,
+        new Double[] {
+            1.0,  // 1
+            1.0,  // 1,4  
+            4.0,  // 4,102
+            0.0   // 102,0
+        });
+    STAT_RESULTS.put(
+        Statistic2.MAX,
+        new Double[] {
+            1.0,   // 1
+            4.0,   // 1,4  
+            102.0, // 4,102
+            102.0  // 102,0
+        });
+    STAT_RESULTS.put(
+        Statistic2.SUM,
+        new Double[] {
+            1.0,   // 1
+            5.0,   // 1,4  
+            106.0, // 4,102
+            102.0  // 102,0
+        });
+    STAT_RESULTS.put(
+        Statistic2.MEAN,
+        new Double[] {
+            1.0,   // 1
+            2.5,   // 1,4  
+            53.0,  // 4,102
+            51.0   // 102,0
+        });
+    STAT_RESULTS.put(
+        Statistic2.STDDEV,
+        new Double[] {
+            0.0,   // 1
+            2.12,  // 1,4  
+            69.29, // 4,102
+            72.12  // 102,0
+        });
+    STAT_RESULTS.put(
+        Regression2.SLOPE,
+        new Double[] {
+            null,  // 1
+            3.0,   // 1,4  
+            98.0,  // 4,102
+            -102.0 // 102,0
+        });
+  }
+  
+  /* test Aggregations.sum() */
+  @Test
+  public void testSum() throws Exception {
+    Double act = Aggregations.sum(Arrays.asList(1, 2, 3.8));  // check with 
mixed Number
+    Double exp = 1 + 2 + 3.8;
+    assertEquals(exp, act, 0.01);
+
+    act = Aggregations.sum(Arrays.asList());
+    exp = 0.0;
+    assertEquals(exp, act, 0.01);
+  }
+  
+  /* test Aggregations.sumInts() */
+  @Test
+  public void testSumInts() throws Exception {
+    long act = Aggregations.sumInts(Arrays.asList(1, 2, 3));
+    long exp = 1 + 2 + 3;
+    assertEquals(exp, act);
+    
+    act = Aggregations.sumInts(Arrays.asList());
+    exp = 0;
+    assertEquals(exp, act);
+  }
+  
+  /* util to test Aggregations.aggregate(List<Number>, stat) */
+  protected void aggregate(UnivariateAggregate stat, Double[] expResults) 
throws Exception {
+    
+    // for "continuous last-2" style inputs of (1, 4, 102, 0)
+         
+         int i = 0;
+         Double result;
+         
+    result = Aggregations.aggregate(Arrays.asList(1), stat);
+    assertResult(i, stat, expResults[i++], result);
+    
+    result = Aggregations.aggregate(Arrays.asList(1,4), stat);
+    assertResult(i, stat, expResults[i++], result);
+    
+    result = Aggregations.aggregate(Arrays.asList(4,102), stat);
+    assertResult(i, stat, expResults[i++], result);
+    
+    result = Aggregations.aggregate(Arrays.asList(102,0), stat);
+    assertResult(i, stat, expResults[i++], result);
+  }
+  
+  /* util to test Aggregations.aggregate(List<T>, getter, stat) */
+  protected void aggregateT(UnivariateAggregate stat, Double[] expResults) 
throws Exception {
+    
+    // for "continuous last-2" style inputs of (1, 4, 102, 0)
+    
+    int i = 0;
+    Double result;
+    List<Object> values;
+    
+    values = Arrays.asList(1);
+    result = Aggregations.aggregate(values, o -> Double.valueOf(o.toString()), 
stat);
+    assertResult(i, stat, expResults[i++], result);
+    
+    values = Arrays.asList(1,4);
+    result = Aggregations.aggregate(values, o -> Double.valueOf(o.toString()), 
stat);
+    assertResult(i, stat, expResults[i++], result);
+    
+    values = Arrays.asList(4,102);
+    result = Aggregations.aggregate(values, o -> Double.valueOf(o.toString()), 
stat);
+    assertResult(i, stat, expResults[i++], result);
+    
+    values = Arrays.asList(102,0);
+    result = Aggregations.aggregate(values, o -> Double.valueOf(o.toString()), 
stat);
+    assertResult(i, stat, expResults[i++], result);
+  }
+
+  /* util to test Aggregations.aggregateN(List<Number>, stats) */
+  protected void aggregateN(UnivariateAggregate[] stats) throws Exception {
+    
+    // for "continuous last-2" style inputs of (1, 4, 102, 0)
+    
+    int i = 0;
+    ResultMap result;
+    
+    result = Aggregations.aggregateN(Arrays.asList(1), stats);
+    assertResult(i++, stats, STAT_RESULTS, result);
+    
+    result = Aggregations.aggregateN(Arrays.asList(1,4), stats);
+    assertResult(i++, stats, STAT_RESULTS, result);
+    
+    result = Aggregations.aggregateN(Arrays.asList(4,102), stats);
+    assertResult(i++, stats, STAT_RESULTS, result);
+    
+    result = Aggregations.aggregateN(Arrays.asList(102,0), stats);
+    assertResult(i++, stats, STAT_RESULTS, result);
+  }
+  
+  /* util to test Aggregations.aggregateN(List<T>, getter, stats) */
+  protected void aggregateNT(UnivariateAggregate[] stats) throws Exception {
+    
+    // for "continuous last-2" style inputs of (1, 4, 102, 0)
+    
+    int i = 0;
+    ResultMap result;
+    List<Object> values;
+    
+    values = Arrays.asList(1);
+    result = Aggregations.aggregateN(values, o -> 
Double.valueOf(o.toString()), stats);
+    assertResult(i++, stats, STAT_RESULTS, result);
+    
+    values = Arrays.asList(1,4);
+    result = Aggregations.aggregateN(values, o -> 
Double.valueOf(o.toString()), stats);
+    assertResult(i++, stats, STAT_RESULTS, result);
+    
+    values = Arrays.asList(4,102);
+    result = Aggregations.aggregateN(values, o -> 
Double.valueOf(o.toString()), stats);
+    assertResult(i++, stats, STAT_RESULTS, result);
+    
+    values = Arrays.asList(102,0);
+    result = Aggregations.aggregateN(values, o -> 
Double.valueOf(o.toString()), stats);
+    assertResult(i++, stats, STAT_RESULTS, result);
+  }
+       
+  protected static void assertResult(int index, UnivariateAggregate stat, 
Double exp, Double act) {
+         if (exp == null && act == null)
+           return;  // match
+         assertFalse("index="+index+" "+stat.name()+" exp="+exp+" act="+act, 
exp == null || act == null);
+    assertEquals("index="+index+" "+stat.name(), exp, act, 0.01);
+       }
+  
+  protected static void assertResult(int i, UnivariateAggregate[] stats, 
Map<UnivariateAggregate,Double[]> expResults, ResultMap actResults) {
+    for (UnivariateAggregate stat : stats) {
+      Double act = actResults.get(stat);
+      Double exp = expResults.get(stat)[i]; 
+      
+      assertResult(i, stat, exp, act);
+    }
+  }
+       
+  /* util to test Aggregations.aggregate(List<Number>, stat) */
+  protected void testAggregate(UnivariateAggregate stat) throws Exception {
+    aggregate(stat, STAT_RESULTS.get(stat));
+  }
+  
+  @Test
+  public void testCOUNT() throws Exception {
+    testAggregate(Statistic2.COUNT);
+  }
+  
+  @Test
+  public void testMIN() throws Exception {
+    testAggregate(Statistic2.MIN);
+  }
+  
+  @Test
+  public void testMAX() throws Exception {
+    testAggregate(Statistic2.MAX);
+  }
+  
+  @Test
+  public void testSUM() throws Exception {
+    testAggregate(Statistic2.SUM);
+  }
+  
+  @Test
+  public void testMEAN() throws Exception {
+    testAggregate(Statistic2.MEAN);
+  }
+  
+  @Test
+  public void testSTDDEV() throws Exception {
+    testAggregate(Statistic2.STDDEV);
+  }
+  
+  @Test
+  public void testSLOPE() throws Exception {
+    testAggregate(Regression2.SLOPE);
+  }
+  
+  @Test
+  public void testEmptyAggregate() throws Exception {
+    Double result;
+    
+    result = Aggregations.aggregate(Collections.emptyList(), Statistic2.COUNT);
+    assertResult(0, Statistic2.COUNT, null, result);
+
+    result = Aggregations.aggregate(Collections.emptyList(), Statistic2.MIN);
+    assertResult(0, Statistic2.MIN, null, result);
+
+    // ...
+    
+    ResultMap resultMap =  Aggregations.aggregateN(Collections.emptyList(), 
Statistic2.MIN);
+    assertTrue(resultMap.isEmpty());
+    
+    resultMap =  Aggregations.aggregateN(Arrays.asList(1));
+    assertTrue(resultMap.isEmpty());
+    
+    resultMap =  Aggregations.aggregateN(Arrays.asList(1), new 
UnivariateAggregate[0]);
+    assertTrue(resultMap.isEmpty());
+  }
+  
+  @Test
+  public void testAggregateT() throws Exception {
+    aggregateT(Statistic2.MIN, STAT_RESULTS.get(Statistic2.MIN));
+  }
+  
+  /* test Aggregations.aggregateN(List<Number>, stats) */
+  @Test
+  public void testAggregateN() throws Exception {
+    aggregateN(STAT_RESULTS.keySet().toArray(new UnivariateAggregate[0]));
+  }
+  
+  /* test Aggregations.aggregateN(List<T>, getter, stats) */
+  @Test
+  public void testAggregateNT() throws Exception {
+    aggregateNT(STAT_RESULTS.keySet().toArray(new UnivariateAggregate[0]));
+  }
+  
+  /* test Aggregations.newResultToJson() */
+  @Test
+  public void testNewResultsToJson() throws Exception {
+    ResultMap result = Aggregations.newResults();
+    result.put(Statistic2.MIN, 2.5);
+    result.put(Statistic2.MAX, 4.5);
+    
+    JsonObject jo = Aggregations.newResultsToJson().apply(result);
+    
+    assertTrue(jo.get(Statistic2.MIN.name()) != null);
+    assertEquals(jo.get(Statistic2.MIN.name()).getAsDouble(), 2.5, 0.01);
+    assertTrue(jo.get(Statistic2.MAX.name()) != null);
+    assertEquals(jo.get(Statistic2.MAX.name()).getAsDouble(), 4.5, 0.01);
+  }
+  
+  /* test Aggregations.newMvResultToJson() */
+  @Test
+  public void testNewMvResultsToJson() throws Exception {
+    ResultMap var1result = Aggregations.newResults();
+    var1result.put(Statistic2.MIN, 2.5);
+    var1result.put(Statistic2.MAX, 4.5);
+    
+    ResultMap var2result = Aggregations.newResults();
+    var2result.put(Statistic2.SUM, 27.1);
+
+    MvResultMap result = Aggregations.newMvResults();
+    result.put("var1", var1result);
+    result.put("var2", var2result);
+
+    JsonObject jo = Aggregations.newMvResultsToJson().apply(result);
+
+    assertTrue(jo.get("var1") != null);
+    JsonObject joVar1 = jo.get("var1").getAsJsonObject();
+    assertTrue(joVar1.get(Statistic2.MIN.name()) != null);
+    assertEquals(joVar1.get(Statistic2.MIN.name()).getAsDouble(), 2.5, 0.01);
+    assertTrue(joVar1.get(Statistic2.MAX.name()) != null);
+    assertEquals(joVar1.get(Statistic2.MAX.name()).getAsDouble(), 4.5, 0.01);
+    
+    assertTrue(jo.get("var2") != null);
+    JsonObject joVar2 = jo.get("var2").getAsJsonObject();
+    assertTrue(joVar2.get(Statistic2.SUM.name()) != null);
+    assertEquals(joVar2.get(Statistic2.SUM.name()).getAsDouble(), 27.1, 0.01);
+  }
+
+  /* test Aggregations.aggregateN(list, stats) in a Stream/Window context */
+  @Test
+  public void testAggregateNStream() throws Exception {
+    // Aggregations.aggregate are ignorant of Stream/Window but
+    // to be safe verify a full Stream/Window based use case works
+    
+    UnivariateAggregate[] stats = STAT_RESULTS.keySet().toArray(new 
UnivariateAggregate[0]);
+    
+    Topology topology = newTopology("testAggregateNStream");
+    
+    // (1, 4, 102, 0)
+    TStream<Integer> sourceData = sourceData(topology);
+    
+    TWindow<Integer, Integer> window = sourceData.last(2, 
Functions.unpartitioned());
+    
+    TStream<ResultMap> aggregate = window.aggregate( (list,partition) -> {
+        return Aggregations.aggregateN(list, stats);
+    });
+    
+    Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 
4);
+    Condition<List<ResultMap>> contents = 
topology.getTester().streamContents(aggregate);
+    complete(topology, count);
+    assertTrue(count.valid());
+      
+    List<ResultMap> tuples = contents.getResult();
+    assertEquals(4, tuples.size());
+    
+    for (int i = 0; i < tuples.size(); i++) {
+      assertResult(i, stats, STAT_RESULTS, tuples.get(i));
+    }
+  }
+
+  /* test Aggregations.aggregateN(list, stats) in a multivariable 
Stream/Window context */
+  @Test
+  public void testMvAggregateNStream() throws Exception {
+    // Aggregations.aggregate are ignorant of Stream/Window but
+    // to be safe verify a full Stream/Window based use case works 
(TStream<MvResultMap>)
+    
+    UnivariateAggregate[] stats = { Statistic2.MIN, Statistic2.MAX };
+    
+    Map<UnivariateAggregate,Double[]> var2_STAT_RESULTS = new HashMap<>();
+    var2_STAT_RESULTS.put(Statistic2.MIN, new Double[] {1001.0, 1001.0, 
1004.0, 1000.0});
+    var2_STAT_RESULTS.put(Statistic2.MAX, new Double[] {1001.0, 1004.0, 
1102.0, 1102.0});
+    
+    
+    Topology topology = newTopology("testAggregateNStream");
+    
+    // (1, 4, 102, 0)
+    TStream<SensorReadings> sourceData = sourceData(topology)
+        .map(i -> new SensorReadings(i, i+1000));
+    
+    TWindow<SensorReadings, Integer> window = sourceData.last(2, 
Functions.unpartitioned());
+    
+    TStream<MvResultMap> aggregate = window.aggregate( (list,partition) -> {
+        ResultMap var1result = Aggregations.aggregateN(list, sr -> sr.var1, 
stats);
+        ResultMap var2result = Aggregations.aggregateN(list, sr -> sr.var2, 
stats);
+        MvResultMap result = Aggregations.newMvResults();
+        result.put("var1", var1result);
+        result.put("var2", var2result);
+        return result;
+    });
+    
+    Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 
4);
+    Condition<List<MvResultMap>> contents = 
topology.getTester().streamContents(aggregate);
+    complete(topology, count);
+    assertTrue(count.valid());
+      
+    List<MvResultMap> tuples = contents.getResult();
+    assertEquals(4, tuples.size());
+    
+    for (int i = 0; i < tuples.size(); i++) {
+      MvResultMap results = tuples.get(i);
+      assertResult(i, stats, STAT_RESULTS, results.get("var1"));
+      assertResult(i, stats, var2_STAT_RESULTS, results.get("var2"));
+    }
+  }
+  
+  protected static class SensorReadings {
+    int var1;
+    int var2;
+    SensorReadings(int var1, int var2) {
+      this.var1 = var1;
+      this.var2 = var2;
+    }
+  }
+  
+  protected static TStream<Integer> sourceData(Topology topology)
+  {
+      return topology.of(1, 4, 102, 0);
+  }
+  
+}

Reply via email to