[
https://issues.apache.org/jira/browse/GEARPUMP-316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040082#comment-16040082
]
ASF GitHub Bot commented on GEARPUMP-316:
-----------------------------------------
Github user huafengw commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/186#discussion_r120526614
--- Diff:
streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+import java.util.function.Consumer
+
+import com.gs.collections.impl.map.mutable.UnifiedMap
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import
org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION,
GEARPUMP_STREAMING_OPERATOR}
+import org.apache.gearpump.streaming.dsl.window.impl.WindowRunner
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * Processes messages in groups as defined by groupBy function.
+ */
+class GroupByTask[IN, GROUP, OUT](
+ groupBy: IN => GROUP,
+ taskContext: TaskContext,
+ userConfig: UserConfig) extends Task(taskContext, userConfig) {
+
+ def this(context: TaskContext, conf: UserConfig) = {
+ this(
+ conf.getValue[IN =>
GROUP](GEARPUMP_STREAMING_GROUPBY_FUNCTION)(context.system).get,
+ context, conf
+ )
+ }
+
+ private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] =
+ new UnifiedMap[GROUP, WindowRunner[IN, OUT]]
+
+ override def onNext(message: Message): Unit = {
+ val input = message.value.asInstanceOf[IN]
+ val group = groupBy(input)
+
+ if (!groups.containsKey(group)) {
+ groups.put(group,
+ userConfig.getValue[WindowRunner[IN, OUT]](
+ GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get)
+ }
+
+ groups.get(group).process(input, message.timestamp)
+ }
+
+ override def onWatermarkProgress(watermark: Instant): Unit = {
+ groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] {
--- End diff --
Looks like you just create a consumer but will it be called?
> Don't enforce groupBy after window
> ----------------------------------
>
> Key: GEARPUMP-316
> URL: https://issues.apache.org/jira/browse/GEARPUMP-316
> Project: Apache Gearpump
> Issue Type: Sub-task
> Components: streaming
> Reporter: Manu Zhang
> Assignee: Manu Zhang
>
> Return a normal Stream instead of WindowStream on window function. Window
> function defines a boundary (window) for elements and the following
> operations should fall in corresponding boundaries. The boundary should not
> change until a new window function is defined. The default boundary is the
> {{GlobalWindows}} if not defined.
> This means there will be a window context for each underlying task. Elements
> are emitted in terms of the trigger semantics.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)