Repository: spark
Updated Branches:
refs/heads/branch-1.0 0441515f2 -> 08c4d1112
SPARK-1663. Corrections for several compile errors in streaming code examples,
and updates to follow API changes
I gave the Streaming code examples, both Scala and Java, a test run today. I
turned up a number of small errors, mostly compile errors in the Java examples.
There were a few typos in the Scala too.
I also took the liberty of adding things like imports, since in several cases
they are not obvious. Feel free to push back on some changes.
There's one thing I haven't quite addressed in the changes. `JavaPairDStream`
uses the Java API version of `Function2` in almost all cases, as `JFunction2`.
However it uses `scala.Function2` in:
```
def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration:
Duration)
:JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
}
```
Is that a typo?
Also, in Scala, I could not get this to compile:
```
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30),
Seconds(10))
error: missing parameter type for expanded function ((x$1, x$2) =>
x$1.$plus(x$2))
```
You can see my fix below but am I missing something?
Otherwise I can say these all worked for me!
Author: Sean Owen <[email protected]>
Closes #589 from srowen/SPARK-1663 and squashes the following commits:
65a906b [Sean Owen] Corrections for several compile errors in streaming code
examples, and updates to follow API changes
(cherry picked from commit 11d54941760f86706e28f7ace8ece664c9164ba6)
Signed-off-by: Patrick Wendell <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08c4d111
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08c4d111
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08c4d111
Branch: refs/heads/branch-1.0
Commit: 08c4d111226fefbc446485a7a885db5f3a3cf1b1
Parents: 0441515
Author: Sean Owen <[email protected]>
Authored: Sat May 3 12:31:31 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Sat May 3 12:31:47 2014 -0700
----------------------------------------------------------------------
docs/streaming-programming-guide.md | 62 ++++++++++++++++++--------------
1 file changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/08c4d111/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md
b/docs/streaming-programming-guide.md
index 7ad0642..b22bb45 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -76,16 +76,19 @@ Besides Spark's configuration, we specify that any DStream
will be processed
in 1 second batches.
{% highlight scala %}
-// Create a StreamingContext with a SparkConf configuration
-val ssc = new StreamingContext(sparkConf, Seconds(1))
+import org.apache.spark.api.java.function._
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.api._
+// Create a StreamingContext with a local master
+val ssc = new StreamingContext("local", "NetworkWordCount", Seconds(1))
{% endhighlight %}
Using this context, we then create a new DStream
by specifying the IP address and port of the data server.
{% highlight scala %}
-// Create a DStream that will connect to serverIP:serverPort
-val lines = ssc.socketTextStream(serverIP, serverPort)
+// Create a DStream that will connect to serverIP:serverPort, like
localhost:9999
+val lines = ssc.socketTextStream("localhost", 9999)
{% endhighlight %}
This `lines` DStream represents the stream of data that will be received from
the data
@@ -103,6 +106,7 @@ each line will be split into multiple words and the stream
of words is represent
`words` DStream. Next, we want to count these words.
{% highlight scala %}
+import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
@@ -138,16 +142,20 @@ functionality. Besides Spark's configuration, we specify
that any DStream would
in 1 second batches.
{% highlight java %}
-// Create a StreamingContext with a SparkConf configuration
-JavaStreamingContext jssc = StreamingContext(sparkConf, new Duration(1000))
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import scala.Tuple2;
+// Create a StreamingContext with a local master
+JavaStreamingContext jssc = new JavaStreamingContext("local",
"JavaNetworkWordCount", new Duration(1000))
{% endhighlight %}
Using this context, we then create a new DStream
by specifying the IP address and port of the data server.
{% highlight java %}
-// Create a DStream that will connect to serverIP:serverPort
-JavaDStream<String> lines = jssc.socketTextStream(serverIP, serverPort);
+// Create a DStream that will connect to serverIP:serverPort, like
localhost:9999
+JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
{% endhighlight %}
This `lines` DStream represents the stream of data that will be received from
the data
@@ -159,7 +167,7 @@ space into words.
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String x) {
- return Lists.newArrayList(x.split(" "));
+ return Arrays.asList(x.split(" "));
}
});
{% endhighlight %}
@@ -359,7 +367,7 @@ as explained earlier. Finally, the last two parameters are
needed to deploy your
if running in distributed mode, as described in the
[Spark programming
guide](scala-programming-guide.html#deploying-code-on-a-cluster).
Additionally, the underlying SparkContext can be accessed as
-`streamingContext.sparkContext`.
+`ssc.sparkContext`.
The batch interval must be set based on the latency requirements of your
application
and available cluster resources. See the [Performance
Tuning](#setting-the-right-batch-size)
@@ -399,7 +407,7 @@ These operations are discussed in detail in later sections.
## Input Sources
-We have already taken a look at the `streamingContext.socketTextStream(...)`
in the [quick
+We have already taken a look at the `ssc.socketTextStream(...)` in the [quick
example](#a-quick-example) which creates a DStream from text
data received over a TCP socket connection. Besides sockets, the core Spark
Streaming API provides
methods for creating DStreams from files and Akka actors as input sources.
@@ -409,12 +417,12 @@ Specifically, for files, the DStream can be created as
<div class="codetabs">
<div data-lang="scala">
{% highlight scala %}
-streamingContext.fileStream(dataDirectory)
+ssc.fileStream(dataDirectory)
{% endhighlight %}
</div>
<div data-lang="java">
{% highlight java %}
-javaStreamingContext.fileStream(dataDirectory);
+jssc.fileStream(dataDirectory);
{% endhighlight %}
</div>
</div>
@@ -443,13 +451,13 @@ project dependencies, you can create a DStream from Kafka
as
<div data-lang="scala">
{% highlight scala %}
import org.apache.spark.streaming.kafka._
-KafkaUtils.createStream(streamingContext, kafkaParams, ...)
+KafkaUtils.createStream(ssc, kafkaParams, ...)
{% endhighlight %}
</div>
<div data-lang="java">
{% highlight java %}
-import org.apache.spark.streaming.kafka.*
-KafkaUtils.createStream(javaStreamingContext, kafkaParams, ...);
+import org.apache.spark.streaming.kafka.*;
+KafkaUtils.createStream(jssc, kafkaParams, ...);
{% endhighlight %}
</div>
</div>
@@ -578,13 +586,14 @@ val runningCounts =
pairs.updateStateByKey[Int](updateFunction _)
<div data-lang="java" markdown="1">
{% highlight java %}
+import com.google.common.base.Optional;
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override public Optional<Integer> call(List<Integer> values,
Optional<Integer> state) {
Integer newSum = ... // add the new values with the previous running
count to get the new count
- return Optional.of(newSum)
+ return Optional.of(newSum);
}
- }
+ };
{% endhighlight %}
This is applied on a DStream containing words (say, the `pairs` DStream
containing `(word,
@@ -617,9 +626,9 @@ spam information (maybe generated with Spark as well) and
then filtering based o
<div data-lang="scala" markdown="1">
{% highlight scala %}
-val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam
information
+val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam
information
-val cleanedDStream = inputDStream.transform(rdd => {
+val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information
to do data cleaning
...
})
@@ -629,13 +638,14 @@ val cleanedDStream = inputDStream.transform(rdd => {
<div data-lang="java" markdown="1">
{% highlight java %}
+import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
-JavaPairRDD<String, Double> spamInfoRDD = javaSparkContext.hadoopFile(...);
+final JavaPairRDD<String, Double> spamInfoRDD =
jssc.sparkContext().newAPIHadoopRDD(...);
-JavaPairDStream<String, Integer> cleanedDStream = inputDStream.transform(
+JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String,
Integer> rdd) throws Exception {
- rdd.join(spamInfoRDD).filter(...) // join data stream with spam
information to do data cleaning
+ rdd.join(spamInfoRDD).filter(...); // join data stream with spam
information to do data cleaning
...
}
});
@@ -684,7 +694,7 @@ operation `reduceByKeyAndWindow`.
{% highlight scala %}
// Reduce last 30 seconds of data, every 10 seconds
-val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30),
Seconds(10))
+val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),
Seconds(30), Seconds(10))
{% endhighlight %}
</div>
@@ -699,7 +709,7 @@ Function2<Integer, Integer, Integer> reduceFunc = new
Function2<Integer, Integer
};
// Reduce last 30 seconds of data, every 10 seconds
-JavaPairDStream<String, Integer> windowedWordCounts =
pair.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
+JavaPairDStream<String, Integer> windowedWordCounts =
pairs.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new
Duration(10000));
{% endhighlight %}
</div>
@@ -1087,7 +1097,7 @@ This behavior is made simple by using
`JavaStreamingContext.getOrCreate`. This i
{% highlight java %}
// Create a factory object that can create a and setup a new
JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory()
{
- JavaStreamingContextFactory create() {
+ @Override public JavaStreamingContext create() {
JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
JavaDStream<String> lines = jssc.socketTextStream(...); // create
DStreams
...