[
https://issues.apache.org/jira/browse/GEARPUMP-32?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395359#comment-15395359
]
ASF GitHub Bot commented on GEARPUMP-32:
----------------------------------------
Github user huafengw commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/67#discussion_r72413298
--- Diff:
examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaWriter.scala
---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.kafka
+
+import java.util.{Random, Properties}
+
+import akka.actor.ActorSystem
+import com.twitter.bijection.Injection
+import org.apache.gearpump.{TimeStamp, Message}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ParseResult, CLIOption,
ArgumentsParser}
+import org.apache.gearpump.partitioner.ShufflePartitioner
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.kafka.KafkaSink
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
+import org.apache.gearpump.streaming.sink.DataSinkProcessor
+import org.apache.gearpump.streaming.source.{DataSource,
DataSourceProcessor}
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.{Graph, LogUtil, AkkaApp}
+import org.apache.gearpump.util.Graph._
+import org.slf4j.Logger
+
+object KafkaWriter extends AkkaApp with ArgumentsParser {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "sink" -> CLIOption[Int]("<hom many kafka processor tasks>", required
= false,
+ defaultValue = Some(1)),
+ "brokerList" -> CLIOption[String]("<broker server list string>",
required = false,
+ defaultValue = Some("localhost:9092")),
+ "sinkTopic" -> CLIOption[String]("<kafka sink topic>", required =
false,
+ defaultValue = Some("topic2"))
+ )
+
+ def application(config: ParseResult, system: ActorSystem):
StreamApplication = {
+ implicit val actorSystem = system
+ val appName = "KafkaWriter"
+ val sinkNum = config.getInt("sink")
+ val brokerList = config.getString("brokerList")
+ val sinkTopic = config.getString("sinkTopic")
+
+ val source = new RandomSource
+ val sourceProcessor = DataSourceProcessor(source, 1)
+ val appConfig = UserConfig.empty
+ val props = new Properties
+ props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ val sink = new KafkaSink(sinkTopic, props)
+ val sinkProcessor = DataSinkProcessor(sink, sinkNum)
+ val partitioner = new ShufflePartitioner
+ val graph = sourceProcessor ~ partitioner ~> sinkProcessor
+ val app = StreamApplication(appName, Graph(graph), appConfig)
+ app
+ }
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ val context = ClientContext(akkaConf)
+ val appId = context.submit(application(config, context.system))
+ context.close()
+ }
+
+ class RandomSource extends DataSource {
+
+ private var count = 0
+ private var lowerBound = 0
+ private val step = 10
+ private val random = new Random
+
+ override def open(context: TaskContext, startTime: TimeStamp): Unit =
{}
+
+ override def close(): Unit = {}
+
+ override def read(): Message = {
+ // event times are not in order among every 10 messages
+ val number = random.nextInt(10) + lowerBound
+ val msg = Message(
+ Injection[Long, Array[Byte]](number) -> Injection[Long,
Array[Byte]](number))
+ count += 1
+ if (count == step) {
--- End diff --
count % step == 0 ?
> Minimum clock of source Tasks maybe inaccurate
> ----------------------------------------------
>
> Key: GEARPUMP-32
> URL: https://issues.apache.org/jira/browse/GEARPUMP-32
> Project: Apache Gearpump
> Issue Type: Bug
> Components: streaming
> Affects Versions: 0.8.0
> Reporter: Manu Zhang
> Assignee: Manu Zhang
> Fix For: 0.8.1
>
>
> Moved from [https://github.com/gearpump/gearpump/issues/1835] and reported by
> [Zhu Yueqian|https://github.com/yueqianzhu]
> {quote}
> Source tasks have not any upstreamClocks. So, startClock is the minimum of
> pending clocks when recover happen.
> eg below:
> source task1: timeStamp:15,not ACK, minClockValue maybe is 15(<= 15).
> source task2: timeStamp:10,ACKed, minClockValue maybe is Long.MaxValue
> when recover happen,startClock maybe is 15. where is the data between 10 to
> 15 at task2?
> {quote}
> More context on this issue:
> In Gearpump, we maintain a global minimum clock tracked from a message's
> timestamp across all tasks. It means messages with timestamp before this
> clock have all been processed. An application will restart from this value on
> failure, and thus at-least-once message delivery could be guaranteed.
> The global minimum clock is the lower bound of all the Tasks' minimum clocks.
> For a task, the minimum clock is the lower of
> # upstream minimum clock
> # a. the minimum timestamp of unacked messages
> b. Long.MaxValue if all messages have been acked.
>
> Note that 2.b allows the global minimum clock to progress and it is almost
> safe since the clock is also bounded by the upstream minimum clock. I said
> "almost safe" because a source task has no upstream but we assume the
> upstream minimum clock is Long.MaxValue. Thus, the scenario described by Zhu
> Yueqian could happen and breaks at-least-once guarantee.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)