[
https://issues.apache.org/jira/browse/GEARPUMP-316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046692#comment-16046692
]
ASF GitHub Bot commented on GEARPUMP-316:
-----------------------------------------
Github user huafengw commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/186#discussion_r121446246
--- Diff:
streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
---
@@ -19,133 +19,121 @@ package org.apache.gearpump.streaming.dsl.window.impl
import java.time.Instant
-import akka.actor.ActorSystem
import com.gs.collections.api.block.predicate.Predicate
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import com.gs.collections.api.block.procedure.{Procedure, Procedure2}
+import com.gs.collections.api.block.procedure.Procedure
import com.gs.collections.impl.list.mutable.FastList
-import com.gs.collections.impl.map.mutable.UnifiedMap
import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
-import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.window.api.Discarding
-import org.apache.gearpump.streaming.task.TaskContext
-import org.apache.gearpump.util.LogUtil
-import org.slf4j.Logger
+import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
+import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
+import scala.collection.mutable.ArrayBuffer
-trait WindowRunner {
+trait WindowRunner[IN, OUT] extends java.io.Serializable {
- def process(message: Message): Unit
+ def process(in: IN, time: Instant): Unit
- def trigger(time: Instant): Unit
+ def trigger(time: Instant): TraversableOnce[(OUT, Instant)]
}
-object DefaultWindowRunner {
+case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
+ right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
- private val LOG: Logger =
LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
+ def process(in: IN, time: Instant): Unit = {
+ left.process(in, time)
+ }
+
+ def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
+ left.trigger(time).foreach(result => right.process(result._1,
result._2))
+ right.trigger(time)
+ }
}
-class DefaultWindowRunner[IN, GROUP, OUT](
- taskContext: TaskContext, userConfig: UserConfig,
- groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
- extends WindowRunner {
-
- private val windowFn = groupBy.window.windowFn
- private val groupedWindowInputs = new UnifiedMap[GROUP,
TreeSortedMap[Window, FastList[IN]]]
- private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN,
OUT]]
- private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
-
- override def process(message: Message): Unit = {
- val input = message.value.asInstanceOf[IN]
- val (group, windows) = groupBy.groupBy(message)
- if (!groupedWindowInputs.containsKey(group)) {
- groupedWindowInputs.put(group, new TreeSortedMap[Window,
FastList[IN]]())
- }
- val windowInputs = groupedWindowInputs.get(group)
- windows.foreach { win =>
+class DefaultWindowRunner[IN, OUT](
+ windows: Windows,
+ fnRunner: FunctionRunner[IN, OUT])
+ extends WindowRunner[IN, OUT] {
+
+ private val windowFn = windows.windowFn
+ private val windowInputs = new TreeSortedMap[Window, FastList[(IN,
Instant)]]
+ private var setup = false
+
+ override def process(in: IN, time: Instant): Unit = {
+ val wins = windowFn(new Context[IN] {
+ override def element: IN = in
+
+ override def timestamp: Instant = time
+ })
+ wins.foreach { win =>
if (windowFn.isNonMerging) {
if (!windowInputs.containsKey(win)) {
- val inputs = new FastList[IN](1)
+ val inputs = new FastList[(IN, Instant)]
windowInputs.put(win, inputs)
}
- windowInputs.get(win).add(input)
+ windowInputs.get(win).add(in -> time)
} else {
- merge(windowInputs, win, input)
+ merge(windowInputs, win, in, time)
}
}
- if (!groupedFnRunners.containsKey(group)) {
- val runner = userConfig.getValue[FunctionRunner[IN,
OUT]](GEARPUMP_STREAMING_OPERATOR).get
- groupedFnRunners.put(group, runner)
- groupedRunnerSetups.put(group, false)
- }
-
- def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win:
Window, input: IN): Unit = {
- val intersected = windowInputs.keySet.select(new Predicate[Window] {
+ def merge(
+ winIns: TreeSortedMap[Window, FastList[(IN, Instant)]],
+ win: Window, in: IN, time: Instant): Unit = {
+ val intersected = winIns.keySet.select(new Predicate[Window] {
override def accept(each: Window): Boolean = {
win.intersects(each)
}
})
var mergedWin = win
- val mergedInputs = FastList.newListWith(input)
+ val mergedInputs = FastList.newListWith(in -> time)
intersected.forEach(new Procedure[Window] {
override def value(each: Window): Unit = {
mergedWin = mergedWin.span(each)
- mergedInputs.addAll(windowInputs.remove(each))
+ mergedInputs.addAll(winIns.remove(each))
}
})
- windowInputs.put(mergedWin, mergedInputs)
+ winIns.put(mergedWin, mergedInputs)
}
-
}
- override def trigger(time: Instant): Unit = {
- groupedWindowInputs.forEachKeyValue(new Procedure2[GROUP,
TreeSortedMap[Window, FastList[IN]]] {
- override def value(group: GROUP, windowInputs: TreeSortedMap[Window,
FastList[IN]]): Unit = {
- onTrigger(group, windowInputs)
- }
- })
-
+ override def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
@annotation.tailrec
- def onTrigger(group: GROUP, windowInputs: TreeSortedMap[Window,
FastList[IN]]): Unit = {
+ def onTrigger(outputs: ArrayBuffer[(OUT, Instant)]):
TraversableOnce[(OUT, Instant)] = {
if (windowInputs.notEmpty()) {
val firstWin = windowInputs.firstKey
if (!time.isBefore(firstWin.endTime)) {
val inputs = windowInputs.remove(firstWin)
- if (groupedFnRunners.containsKey(group)) {
- val runner =
FunctionRunner.withEmitFn(groupedFnRunners.get(group),
- (output: OUT) => {
- taskContext.output(Message(output, time))
- })
- val setup = groupedRunnerSetups.get(group)
- if (!setup) {
- runner.setup()
- groupedRunnerSetups.put(group, true)
- }
- inputs.forEach(new Procedure[IN] {
- override def value(t: IN): Unit = {
- // .toList forces eager evaluation
- runner.process(t).toList
+ if (!setup) {
+ fnRunner.setup()
+ setup = true
+ }
+ inputs.forEach(new Procedure[(IN, Instant)] {
+ override def value(v: (IN, Instant)): Unit = {
+ fnRunner.process(v._1).foreach {
+ out: OUT => outputs += (out -> v._2)
}
- })
- // .toList forces eager evaluation
- runner.finish().toList
- if (groupBy.window.accumulationMode == Discarding) {
- runner.teardown()
- groupedRunnerSetups.put(group, false)
- // dicarding, setup need to be called for each window
- onTrigger(group, windowInputs)
- } else {
- // accumulating, setup is only called for the first window
- onTrigger(group, windowInputs)
}
+ })
+ fnRunner.finish().foreach {
+ out: OUT => outputs += (out -> firstWin.endTime.minusMillis(1))
--- End diff --
I'm wondering, does the message time order in `outputs` matter? If so, is
it guaranteed?
> Don't enforce groupBy after window
> ----------------------------------
>
> Key: GEARPUMP-316
> URL: https://issues.apache.org/jira/browse/GEARPUMP-316
> Project: Apache Gearpump
> Issue Type: Sub-task
> Components: streaming
> Reporter: Manu Zhang
> Assignee: Manu Zhang
>
> Return a normal Stream instead of WindowStream on window function. Window
> function defines a boundary (window) for elements and the following
> operations should fall in corresponding boundaries. The boundary should not
> change until a new window function is defined. The default boundary is the
> {{GlobalWindows}} if not defined.
> This means there will be a window context for each underlying task. Elements
> are emitted in terms of the trigger semantics.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)