Repository: spark Updated Branches: refs/heads/branch-2.1 517f39833 -> d489e1dc7
[SPARK-19041][SS] Fix code snippet compilation issues in Structured Streaming Programming Guide ## What changes were proposed in this pull request? Currently some code snippets in the programming guide just do not compile. We should fix them. ## How was this patch tested? ``` SKIP_API=1 jekyll build ``` ## Screenshot from part of the change:  Author: Liwei Lin <[email protected]> Closes #16442 from lw-lin/ss-pro-guide-. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d489e1dc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d489e1dc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d489e1dc Branch: refs/heads/branch-2.1 Commit: d489e1dc7ecf7cf081141d3f45f86c39fc3db1fe Parents: 517f398 Author: Liwei Lin <[email protected]> Authored: Mon Jan 2 14:40:06 2017 +0000 Committer: Shixiong Zhu <[email protected]> Committed: Mon Jan 2 11:58:29 2017 -0800 ---------------------------------------------------------------------- docs/structured-streaming-programming-guide.md | 87 ++++++++++++--------- 1 file changed, 51 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d489e1dc/docs/structured-streaming-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 3b7d0c4..799f636 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -537,9 +537,9 @@ Most of the common operations on DataFrame/Dataset are supported for streaming. <div data-lang="scala" markdown="1"> {% highlight scala %} -case class DeviceData(device: String, type: String, signal: Double, time: DateTime) +case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime) -val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: string } +val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string } val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data // Select the devices which have signal more than 10 @@ -547,11 +547,11 @@ df.select("device").where("signal > 10") // using untyped APIs ds.filter(_.signal > 10).map(_.device) // using typed APIs // Running count of the number of updates for each device type -df.groupBy("type").count() // using untyped API +df.groupBy("deviceType").count() // using untyped API // Running average signal for each device type -import org.apache.spark.sql.expressions.scalalang.typed._ -ds.groupByKey(_.type).agg(typed.avg(_.signal)) // using typed API +import org.apache.spark.sql.expressions.scalalang.typed +ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API {% endhighlight %} </div> @@ -565,7 +565,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; public class DeviceData { private String device; - private String type; + private String deviceType; private Double signal; private java.sql.Date time; ... @@ -590,13 +590,13 @@ ds.filter(new FilterFunction<DeviceData>() { // using typed APIs }, Encoders.STRING()); // Running count of the number of updates for each device type -df.groupBy("type").count(); // using untyped API +df.groupBy("deviceType").count(); // using untyped API // Running average signal for each device type ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API @Override public String call(DeviceData value) throws Exception { - return value.getType(); + return value.getDeviceType(); } }, Encoders.STRING()).agg(typed.avg(new MapFunction<DeviceData, Double>() { @Override @@ -611,13 +611,13 @@ ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API <div data-lang="python" markdown="1"> {% highlight python %} -df = ... # streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType } +df = ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType } # Select the devices which have signal more than 10 df.select("device").where("signal > 10") # Running count of the number of updates for each device type -df.groupBy("type").count() +df.groupBy("deviceType").count() {% endhighlight %} </div> </div> @@ -973,7 +973,7 @@ Here is a table of all the sinks, and the corresponding settings. <tr> <td><b>File Sink</b></td> <td>Append</td> - <td><pre>writeStream<br/> .format("parquet")<br/> .start()</pre></td> + <td><pre>writeStream<br/> .format("parquet")<br/> .option(<br/> "checkpointLocation",<br/> "path/to/checkpoint/dir")<br/> .option(<br/> "path",<br/> "path/to/destination/dir")<br/> .start()</pre></td> <td>Yes</td> <td>Supports writes to partitioned tables. Partitioning by time may be useful.</td> </tr> @@ -1026,7 +1026,9 @@ noAggDF // Write new data to Parquet files noAggDF .writeStream - .parquet("path/to/destination/directory") + .format("parquet") + .option("checkpointLocation", "path/to/checkpoint/dir") + .option("path", "path/to/destination/dir") .start() // ========== DF with aggregation ========== @@ -1066,7 +1068,9 @@ noAggDF // Write new data to Parquet files noAggDF .writeStream() - .parquet("path/to/destination/directory") + .format("parquet") + .option("checkpointLocation", "path/to/checkpoint/dir") + .option("path", "path/to/destination/dir") .start(); // ========== DF with aggregation ========== @@ -1106,7 +1110,9 @@ noAggDF \ # Write new data to Parquet files noAggDF \ .writeStream() \ - .parquet("path/to/destination/directory") \ + .format("parquet") \ + .option("checkpointLocation", "path/to/checkpoint/dir") \ + .option("path", "path/to/destination/dir") \ .start() # ========== DF with aggregation ========== @@ -1120,11 +1126,11 @@ aggDF \ .start() # Have all the aggregates in an in memory table. The query name will be the table name -aggDF\ - .writeStream()\ - .queryName("aggregates")\ - .outputMode("complete")\ - .format("memory")\ +aggDF \ + .writeStream() \ + .queryName("aggregates") \ + .outputMode("complete") \ + .format("memory") \ .start() spark.sql("select * from aggregates").show() # interactively query in-memory table @@ -1159,7 +1165,9 @@ The `StreamingQuery` object created when a query is started can be used to monit {% highlight scala %} val query = df.writeStream.format("console").start() // get the query object -query.id // get the unique identifier of the running query +query.id // get the unique identifier of the running query that persists across restarts from checkpoint data + +query.runId // get the unique id of this run of the query, which will be generated at every start/restart query.name // get the name of the auto-generated or user-specified name @@ -1169,11 +1177,11 @@ query.stop() // stop the query query.awaitTermination() // block until query is terminated, with stop() or with error -query.exception() // the exception if the query has been terminated with error +query.exception // the exception if the query has been terminated with error -query.sourceStatus() // progress information about data has been read from the input sources +query.recentProgress // an array of the most recent progress updates for this query -query.sinkStatus() // progress information about data written to the output sink +query.lastProgress // the most recent progress update of this streaming query {% endhighlight %} @@ -1183,21 +1191,23 @@ query.sinkStatus() // progress information about data written to the output si {% highlight java %} StreamingQuery query = df.writeStream().format("console").start(); // get the query object -query.id(); // get the unique identifier of the running query +query.id(); // get the unique identifier of the running query that persists across restarts from checkpoint data + +query.runId(); // get the unique id of this run of the query, which will be generated at every start/restart query.name(); // get the name of the auto-generated or user-specified name query.explain(); // print detailed explanations of the query -query.stop(); // stop the query +query.stop(); // stop the query query.awaitTermination(); // block until query is terminated, with stop() or with error -query.exception(); // the exception if the query has been terminated with error +query.exception(); // the exception if the query has been terminated with error -query.sourceStatus(); // progress information about data has been read from the input sources +query.recentProgress(); // an array of the most recent progress updates for this query -query.sinkStatus(); // progress information about data written to the output sink +query.lastProgress(); // the most recent progress update of this streaming query {% endhighlight %} @@ -1207,7 +1217,9 @@ query.sinkStatus(); // progress information about data written to the output s {% highlight python %} query = df.writeStream().format("console").start() # get the query object -query.id() # get the unique identifier of the running query +query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data + +query.runId() # get the unique id of this run of the query, which will be generated at every start/restart query.name() # get the name of the auto-generated or user-specified name @@ -1217,11 +1229,11 @@ query.stop() # stop the query query.awaitTermination() # block until query is terminated, with stop() or with error -query.exception() # the exception if the query has been terminated with error +query.exception() # the exception if the query has been terminated with error -query.sourceStatus() # progress information about data has been read from the input sources +query.recentProgress() # an array of the most recent progress updates for this query -query.sinkStatus() # progress information about data written to the output sink +query.lastProgress() # the most recent progress update of this streaming query {% endhighlight %} @@ -1491,14 +1503,17 @@ spark.streams.addListener(new StreamingQueryListener() { {% highlight java %} SparkSession spark = ... -spark.streams.addListener(new StreamingQueryListener() { - @Overrides void onQueryStarted(QueryStartedEvent queryStarted) { +spark.streams().addListener(new StreamingQueryListener() { + @Override + public void onQueryStarted(QueryStartedEvent queryStarted) { System.out.println("Query started: " + queryStarted.id()); } - @Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) { + @Override + public void onQueryTerminated(QueryTerminatedEvent queryTerminated) { System.out.println("Query terminated: " + queryTerminated.id()); } - @Overrides void onQueryProgress(QueryProgressEvent queryProgress) { + @Override + public void onQueryProgress(QueryProgressEvent queryProgress) { System.out.println("Query made progress: " + queryProgress.progress()); } }); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
