Github user manuzhang commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/186#discussion_r121539311
--- Diff:
streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
---
@@ -19,133 +19,121 @@ package org.apache.gearpump.streaming.dsl.window.impl
import java.time.Instant
-import akka.actor.ActorSystem
import com.gs.collections.api.block.predicate.Predicate
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import com.gs.collections.api.block.procedure.{Procedure, Procedure2}
+import com.gs.collections.api.block.procedure.Procedure
import com.gs.collections.impl.list.mutable.FastList
-import com.gs.collections.impl.map.mutable.UnifiedMap
import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
-import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.window.api.Discarding
-import org.apache.gearpump.streaming.task.TaskContext
-import org.apache.gearpump.util.LogUtil
-import org.slf4j.Logger
+import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
+import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
+import scala.collection.mutable.ArrayBuffer
-trait WindowRunner {
+trait WindowRunner[IN, OUT] extends java.io.Serializable {
- def process(message: Message): Unit
+ def process(in: IN, time: Instant): Unit
- def trigger(time: Instant): Unit
+ def trigger(time: Instant): TraversableOnce[(OUT, Instant)]
}
-object DefaultWindowRunner {
+case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
+ right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
- private val LOG: Logger =
LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
+ def process(in: IN, time: Instant): Unit = {
+ left.process(in, time)
+ }
+
+ def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
+ left.trigger(time).foreach(result => right.process(result._1,
result._2))
+ right.trigger(time)
+ }
}
-class DefaultWindowRunner[IN, GROUP, OUT](
- taskContext: TaskContext, userConfig: UserConfig,
- groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
- extends WindowRunner {
-
- private val windowFn = groupBy.window.windowFn
- private val groupedWindowInputs = new UnifiedMap[GROUP,
TreeSortedMap[Window, FastList[IN]]]
- private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN,
OUT]]
- private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
-
- override def process(message: Message): Unit = {
- val input = message.value.asInstanceOf[IN]
- val (group, windows) = groupBy.groupBy(message)
- if (!groupedWindowInputs.containsKey(group)) {
- groupedWindowInputs.put(group, new TreeSortedMap[Window,
FastList[IN]]())
- }
- val windowInputs = groupedWindowInputs.get(group)
- windows.foreach { win =>
+class DefaultWindowRunner[IN, OUT](
+ windows: Windows,
+ fnRunner: FunctionRunner[IN, OUT])
+ extends WindowRunner[IN, OUT] {
+
+ private val windowFn = windows.windowFn
+ private val windowInputs = new TreeSortedMap[Window, FastList[(IN,
Instant)]]
+ private var setup = false
+
+ override def process(in: IN, time: Instant): Unit = {
+ val wins = windowFn(new Context[IN] {
+ override def element: IN = in
+
+ override def timestamp: Instant = time
+ })
+ wins.foreach { win =>
if (windowFn.isNonMerging) {
if (!windowInputs.containsKey(win)) {
- val inputs = new FastList[IN](1)
+ val inputs = new FastList[(IN, Instant)]
windowInputs.put(win, inputs)
}
- windowInputs.get(win).add(input)
+ windowInputs.get(win).add(in -> time)
} else {
- merge(windowInputs, win, input)
+ merge(windowInputs, win, in, time)
}
}
- if (!groupedFnRunners.containsKey(group)) {
- val runner = userConfig.getValue[FunctionRunner[IN,
OUT]](GEARPUMP_STREAMING_OPERATOR).get
- groupedFnRunners.put(group, runner)
- groupedRunnerSetups.put(group, false)
- }
-
- def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win:
Window, input: IN): Unit = {
- val intersected = windowInputs.keySet.select(new Predicate[Window] {
+ def merge(
+ winIns: TreeSortedMap[Window, FastList[(IN, Instant)]],
+ win: Window, in: IN, time: Instant): Unit = {
+ val intersected = winIns.keySet.select(new Predicate[Window] {
override def accept(each: Window): Boolean = {
win.intersects(each)
}
})
var mergedWin = win
- val mergedInputs = FastList.newListWith(input)
+ val mergedInputs = FastList.newListWith(in -> time)
intersected.forEach(new Procedure[Window] {
override def value(each: Window): Unit = {
mergedWin = mergedWin.span(each)
- mergedInputs.addAll(windowInputs.remove(each))
+ mergedInputs.addAll(winIns.remove(each))
}
})
- windowInputs.put(mergedWin, mergedInputs)
+ winIns.put(mergedWin, mergedInputs)
}
-
}
- override def trigger(time: Instant): Unit = {
- groupedWindowInputs.forEachKeyValue(new Procedure2[GROUP,
TreeSortedMap[Window, FastList[IN]]] {
- override def value(group: GROUP, windowInputs: TreeSortedMap[Window,
FastList[IN]]): Unit = {
- onTrigger(group, windowInputs)
- }
- })
-
+ override def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
@annotation.tailrec
- def onTrigger(group: GROUP, windowInputs: TreeSortedMap[Window,
FastList[IN]]): Unit = {
+ def onTrigger(outputs: ArrayBuffer[(OUT, Instant)]):
TraversableOnce[(OUT, Instant)] = {
if (windowInputs.notEmpty()) {
val firstWin = windowInputs.firstKey
if (!time.isBefore(firstWin.endTime)) {
val inputs = windowInputs.remove(firstWin)
- if (groupedFnRunners.containsKey(group)) {
- val runner =
FunctionRunner.withEmitFn(groupedFnRunners.get(group),
- (output: OUT) => {
- taskContext.output(Message(output, time))
- })
- val setup = groupedRunnerSetups.get(group)
- if (!setup) {
- runner.setup()
- groupedRunnerSetups.put(group, true)
- }
- inputs.forEach(new Procedure[IN] {
- override def value(t: IN): Unit = {
- // .toList forces eager evaluation
- runner.process(t).toList
+ if (!setup) {
+ fnRunner.setup()
+ setup = true
+ }
+ inputs.forEach(new Procedure[(IN, Instant)] {
+ override def value(v: (IN, Instant)): Unit = {
+ fnRunner.process(v._1).foreach {
+ out: OUT => outputs += (out -> v._2)
}
- })
- // .toList forces eager evaluation
- runner.finish().toList
- if (groupBy.window.accumulationMode == Discarding) {
- runner.teardown()
- groupedRunnerSetups.put(group, false)
- // dicarding, setup need to be called for each window
- onTrigger(group, windowInputs)
- } else {
- // accumulating, setup is only called for the first window
- onTrigger(group, windowInputs)
}
+ })
+ fnRunner.finish().foreach {
+ out: OUT => outputs += (out -> firstWin.endTime.minusMillis(1))
--- End diff --
Yes, it could make the output minClock non increasing. I created
https://issues.apache.org/jira/browse/GEARPUMP-317
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---