This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.10
in repository https://gitbox.apache.org/repos/asf/zeppelin.git

commit 9d1dd88f75f27839519cf8c15b1f6def62536c1f
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Mon Aug 9 11:53:55 2021 +0800

    [ZEPPELIN-5484]. Update flink doc
    
    ### What is this PR for?
    
    Add more content for flink interpreter.
    
    ### What type of PR is it?
    [Documentation ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5484
    
    ### How should this be tested?
    * No test needed
    
    ### Screenshots (if appropriate)
    
![image](https://user-images.githubusercontent.com/164491/128988613-47f4ae08-0a93-4af6-8536-ecfba9333a60.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #4202 from zjffdu/ZEPPELIN-5484 and squashes the following commits:
    
    41beb1bdfb [Jeff Zhang] [ZEPPELIN-5484] Update flink doc
    
    (cherry picked from commit 3b5d4c2c674cffd2dcc2cfd9f8ffe9c2f2997b5c)
---
 .../zeppelin/img/docs-img/flink_append_mode.gif    | Bin 294307 -> 283669 bytes
 .../zeppelin/img/docs-img/flink_architecture.png   | Bin 0 -> 71228 bytes
 .../img/docs-img/flink_docker_tutorial.gif         | Bin 0 -> 1688852 bytes
 .../img/docs-img/flink_scala_codecompletion.png    | Bin 0 -> 24687 bytes
 .../zeppelin/img/docs-img/flink_sql_comment.png    | Bin 0 -> 115812 bytes
 .../zeppelin/img/docs-img/flink_sql_jobname.png    | Bin 0 -> 37968 bytes
 .../img/docs-img/flink_sql_multiple_insert.png     | Bin 0 -> 64647 bytes
 .../img/docs-img/flink_sql_parallelism.png         | Bin 0 -> 36481 bytes
 .../img/docs-img/flink_streaming_wordcount.png     | Bin 0 -> 159732 bytes
 .../zeppelin/img/docs-img/flink_udf_jars.png       | Bin 0 -> 52702 bytes
 .../zeppelin/img/docs-img/flink_update_mode.gif    | Bin 131055 -> 205217 bytes
 docs/interpreter/flink.md                          | 594 ++++++++++++++++-----
 12 files changed, 446 insertions(+), 148 deletions(-)

diff --git a/docs/assets/themes/zeppelin/img/docs-img/flink_append_mode.gif 
b/docs/assets/themes/zeppelin/img/docs-img/flink_append_mode.gif
index 3c827f4..dd4d1da 100644
Binary files a/docs/assets/themes/zeppelin/img/docs-img/flink_append_mode.gif 
and b/docs/assets/themes/zeppelin/img/docs-img/flink_append_mode.gif differ
diff --git a/docs/assets/themes/zeppelin/img/docs-img/flink_architecture.png 
b/docs/assets/themes/zeppelin/img/docs-img/flink_architecture.png
new file mode 100644
index 0000000..6a2a6e9
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/docs-img/flink_architecture.png differ
diff --git a/docs/assets/themes/zeppelin/img/docs-img/flink_docker_tutorial.gif 
b/docs/assets/themes/zeppelin/img/docs-img/flink_docker_tutorial.gif
new file mode 100644
index 0000000..aa53c5b
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/docs-img/flink_docker_tutorial.gif differ
diff --git 
a/docs/assets/themes/zeppelin/img/docs-img/flink_scala_codecompletion.png 
b/docs/assets/themes/zeppelin/img/docs-img/flink_scala_codecompletion.png
new file mode 100644
index 0000000..6b6dcda
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/docs-img/flink_scala_codecompletion.png differ
diff --git a/docs/assets/themes/zeppelin/img/docs-img/flink_sql_comment.png 
b/docs/assets/themes/zeppelin/img/docs-img/flink_sql_comment.png
new file mode 100644
index 0000000..6d866ac
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/docs-img/flink_sql_comment.png differ
diff --git a/docs/assets/themes/zeppelin/img/docs-img/flink_sql_jobname.png 
b/docs/assets/themes/zeppelin/img/docs-img/flink_sql_jobname.png
new file mode 100644
index 0000000..9f8e2f4
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/docs-img/flink_sql_jobname.png differ
diff --git 
a/docs/assets/themes/zeppelin/img/docs-img/flink_sql_multiple_insert.png 
b/docs/assets/themes/zeppelin/img/docs-img/flink_sql_multiple_insert.png
new file mode 100644
index 0000000..5eaa4ac
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/docs-img/flink_sql_multiple_insert.png differ
diff --git a/docs/assets/themes/zeppelin/img/docs-img/flink_sql_parallelism.png 
b/docs/assets/themes/zeppelin/img/docs-img/flink_sql_parallelism.png
new file mode 100644
index 0000000..260686c
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/docs-img/flink_sql_parallelism.png differ
diff --git 
a/docs/assets/themes/zeppelin/img/docs-img/flink_streaming_wordcount.png 
b/docs/assets/themes/zeppelin/img/docs-img/flink_streaming_wordcount.png
new file mode 100644
index 0000000..4b1168b
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/docs-img/flink_streaming_wordcount.png differ
diff --git a/docs/assets/themes/zeppelin/img/docs-img/flink_udf_jars.png 
b/docs/assets/themes/zeppelin/img/docs-img/flink_udf_jars.png
new file mode 100644
index 0000000..c5431b4
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/docs-img/flink_udf_jars.png differ
diff --git a/docs/assets/themes/zeppelin/img/docs-img/flink_update_mode.gif 
b/docs/assets/themes/zeppelin/img/docs-img/flink_update_mode.gif
index fe7e2e9..29e3820 100644
Binary files a/docs/assets/themes/zeppelin/img/docs-img/flink_update_mode.gif 
and b/docs/assets/themes/zeppelin/img/docs-img/flink_update_mode.gif differ
diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 01ea99e..77e4b99 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -24,7 +24,8 @@ limitations under the License.
 <div id="toc"></div>
 
 ## Overview
-[Apache Flink](https://flink.apache.org) is an open source platform for 
distributed stream and batch data processing. Flink’s core is a streaming 
dataflow engine that provides data distribution, communication, and fault 
tolerance for distributed computations over data streams. Flink also builds 
batch processing on top of the streaming engine, overlaying native iteration 
support, managed memory, and program optimization.
+[Apache Flink](https://flink.apache.org) is a framework and distributed 
processing engine for stateful computations over unbounded and bounded data 
streams. 
+Flink has been designed to run in all common cluster environments, perform 
computations at in-memory speed and at any scale.
 
 In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the 
latest version of Flink. **Only Flink 1.10+ is supported, old versions of flink 
won't work.**
 Apache Flink is supported in Zeppelin with the Flink interpreter group which 
consists of the five interpreters listed below.
@@ -62,13 +63,106 @@ Apache Flink is supported in Zeppelin with the Flink 
interpreter group which con
   </tr>
 </table>
 
+## Main Features
+
+<table class="table-configuration">
+  <tr>
+    <th>Feature</th>
+    <th>Description</th>
+  </tr>
+  <tr>
+    <td>Support multiple versions of Flink</td>
+    <td>You can run different versions of Flink in one Zeppelin instance</td>
+  </tr>
+  <tr>
+    <td>Support multiple versions of Scala</td>
+    <td>You can run different Scala versions of Flink in on Zeppelin 
instance</td>
+  </tr>
+  <tr>
+    <td>Support multiple languages</td>
+    <td>Scala, Python, SQL are supported, besides that you can also 
collaborate across languages, e.g. you can write Scala UDF and use it in 
PyFlink</td>
+  </tr>
+  <tr>
+    <td>Support multiple execution modes</td>
+    <td>Local | Remote | Yarn | Yarn Application</td>
+  </tr>
+  <tr>
+    <td>Support Hive</td>
+    <td>Hive catalog is supported</td>
+  </tr>
+  <tr>
+    <td>Interactive development</td>
+    <td>Interactive development user experience increase your productivity</td>
+  </tr>
+  <tr>
+    <td>Enhancement on Flink SQL</td>
+    <td>* Support both streaming sql and batch sql in one notebook <br/>
+* Support sql comment (single line comment/multiple line comment) <br/>
+* Support advanced configuration (jobName, parallelism) <br/>
+* Support multiple insert statements 
+  </td>
+  </tr>
+    <td>Multi-tenancy</td>
+    <td>Multiple user can work in one Zeppelin instance without affecting each 
other.</td>
+  </tr>
+
+  </tr>
+    <td>Rest API Support</td>
+    <td>You can not only submit Flink job via Zeppelin notebook UI, but also 
can do that via its rest api (You can use Zeppelin as Flink job server).</td>
+  </tr>
+</table>
+
+## Play Flink in Zeppelin docker
+
+For beginner, we would suggest you to play Flink in Zeppelin docker. 
+First you need to download Flink, because there's no Flink binary distribution 
shipped with Zeppelin. 
+e.g. Here we download Flink 1.12.2 to`/mnt/disk1/flink-1.12.2`,
+and we mount it to Zeppelin docker container and run the following command to 
start Zeppelin docker.
+
+```bash
+docker run -u $(id -u) -p 8080:8080 --rm -v /mnt/disk1/flink-1.12.2:/opt/flink 
-e FLINK_HOME=/opt/flink  --name zeppelin apache/zeppelin:0.10.0
+```
+
+After running the above command, you can open `http://localhost:8080` to play 
Flink in Zeppelin. We only verify the flink local mode in Zeppelin docker, 
other modes may not due to network issues.
+
+Here's screenshot of running note `Flink Tutorial/5. Streaming Data Analytics`
+
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_docker_tutorial.gif">
+
+
+You can also mount notebook folder to replace the built-in zeppelin tutorial 
notebook. 
+e.g. Here's a repo of Flink sql cookbook on Zeppelin: 
[https://github.com/zjffdu/flink-sql-cookbook-on-zeppelin/](https://github.com/zjffdu/flink-sql-cookbook-on-zeppelin/)
+
+You can clone this repo and mount it to docker,
+
+```
+docker run -u $(id -u) -p 8080:8080 --rm -v 
/mnt/disk1/flink-sql-cookbook-on-zeppelin:/notebook -v 
/mnt/disk1/flink-1.12.2:/opt/flink -e FLINK_HOME=/opt/flink  -e 
ZEPPELIN_NOTEBOOK_DIR='/notebook' --name zeppelin apache/zeppelin:0.10.0
+```
+
 ## Prerequisites
 
-* Download Flink 1.10 for scala 2.11 (Only scala-2.11 is supported, scala-2.12 
is not supported yet in Zeppelin)
+Download Flink 1.10 or afterwards (Scala 2.11 & 2.12 are both supported)
+
+
+## Flink on Zeppelin Architecture
+
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_architecture.png">
+
+The above diagram is the architecture of Flink on Zeppelin. Flink interpreter 
on the left side is actually a Flink client 
+which is responsible for compiling and managing Flink job lifecycle, such as 
submit, cancel job, 
+monitoring job progress and so on. The Flink cluster on the right side is the 
place where executing Flink job. 
+It could be a MiniCluster (local mode), Standalone cluster (remote mode), 
+Yarn session cluster (yarn mode) or Yarn application session cluster 
(yarn-application mode)
+
+There are 2 important components in Flink interpreter: Scala shell & Python 
shell
+
+* Scala shell is the entry point of Flink interpreter, it would create all the 
entry points of Flink program, such as 
ExecutionEnvironment,StreamExecutionEnvironment and TableEnvironment. Scala 
shell is responsible for compiling and running Scala code and sql.
+* Python shell is the entry point of PyFlink, it is responsible for compiling 
and running Python code.
 
 ## Configuration
+
 The Flink interpreter can be configured with properties provided by Zeppelin 
(as following table).
-You can also add and set other flink properties which are not listed in the 
table. For a list of additional properties, refer to [Flink Available 
Properties](https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html).
+You can also add and set other Flink properties which are not listed in the 
table. For a list of additional properties, refer to [Flink Available 
Properties](https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html).
 <table class="table-configuration">
   <tr>
     <th>Property</th>
@@ -78,7 +172,7 @@ You can also add and set other flink properties which are 
not listed in the tabl
   <tr>
     <td>`FLINK_HOME`</td>
     <td></td>
-    <td>Location of flink installation. It is must be specified, otherwise you 
can not use flink in Zeppelin</td>
+    <td>Location of Flink installation. It is must be specified, otherwise you 
can not use Flink in Zeppelin</td>
   </tr>
   <tr>
     <td>`HADOOP_CONF_DIR`</td>
@@ -93,7 +187,7 @@ You can also add and set other flink properties which are 
not listed in the tabl
   <tr>
     <td>flink.execution.mode</td>
     <td>local</td>
-    <td>Execution mode of flink, e.g. local | yarn | remote</td>
+    <td>Execution mode of Flink, e.g. local | remote | yarn | 
yarn-application</td>
   </tr>
   <tr>
     <td>flink.execution.remote.host</td>
@@ -108,12 +202,12 @@ You can also add and set other flink properties which are 
not listed in the tabl
   <tr>
     <td>jobmanager.memory.process.size</td>
     <td>1024m</td>
-    <td>Total number of memory of JobManager, e.g. 1024m. It is official 
[flink 
property](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/)</td>
+    <td>Total memory size of JobManager, e.g. 1024m. It is official [Flink 
property](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/)</td>
   </tr>
   <tr>
     <td>taskmanager.memory.process.size</td>
     <td>1024m</td>
-    <td>Total number of memory of TaskManager, e.g. 1024m. It is official 
[flink 
property](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/)</td>
+    <td>Total memory size of TaskManager, e.g. 1024m. It is official [Flink 
property](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/)</td>
   </tr>
   <tr>
     <td>taskmanager.numberOfTaskSlots</td>
@@ -138,32 +232,32 @@ You can also add and set other flink properties which are 
not listed in the tabl
   <tr>
     <td>zeppelin.flink.uiWebUrl</td>
     <td></td>
-    <td>User specified Flink JobManager url, it could be used in remote mode 
where Flink cluster is already started, or could be used as url template, e.g. 
https://knox-server:8443/gateway/cluster-topo/yarn/proxy/{{applicationId}}/ 
where {{applicationId}} would be replaced with yarn app id</td>
+    <td>User specified Flink JobManager url, it could be used in remote mode 
where Flink cluster is already started, or could be used as url template, e.g. 
https://knox-server:8443/gateway/cluster-topo/yarn/proxy/{% raw 
%}{{applicationId}}{% endraw %}/ where {% raw %}{{applicationId}}{% endraw %} 
is placeholder of yarn app id</td>
   </tr>
   <tr>
     <td>zeppelin.flink.run.asLoginUser</td>
     <td>true</td>
-    <td>Whether run flink job as the zeppelin login user, it is only applied 
when running flink job in hadoop yarn cluster and shiro is enabled</td>
+    <td>Whether run Flink job as the Zeppelin login user, it is only applied 
when running Flink job in hadoop yarn cluster and shiro is enabled</td>
   </tr> 
   <tr>
     <td>flink.udf.jars</td>
     <td></td>
-    <td>Flink udf jars (comma separated), zeppelin will register udf in this 
jar automatically for user. These udf jars could be either local files or hdfs 
files if you have hadoop installed. The udf name is the class name.</td>
+    <td>Flink udf jars (comma separated), Zeppelin will register udf in these 
jars automatically for user. These udf jars could be either local files or hdfs 
files if you have hadoop installed. The udf name is the class name.</td>
   </tr>
   <tr>
     <td>flink.udf.jars.packages</td>
     <td></td>
-    <td>Packages (comma separated) that would be searched for the udf defined 
in `flink.udf.jars`.</td>
+    <td>Packages (comma separated) that would be searched for the udf defined 
in `flink.udf.jars`. Specifying this can reduce the number of classes to scan, 
otherwise all the classes in udf jar will be scanned.</td>
   </tr>
   <tr>
     <td>flink.execution.jars</td>
     <td></td>
-    <td>Additional user jars (comma separated), these jars could be either 
local files or hdfs files if you have hadoop installed.</td>
+    <td>Additional user jars (comma separated), these jars could be either 
local files or hdfs files if you have hadoop installed. It can be used to 
specify Flink connector jars or udf jars (no udf class auto-registration like 
`flink.udf.jars`)</td>
   </tr>
   <tr>
     <td>flink.execution.packages</td>
     <td></td>
-    <td>Additional user packages (comma separated), e.g. 
org.apache.flink:flink-connector-kafka_2.11:1.10,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0</td>
+    <td>Additional user packages (comma separated), e.g. 
`org.apache.flink:flink-json:1.10.0`</td>
   </tr>
   <tr>
     <td>zeppelin.flink.concurrentBatchSql.max</td>
@@ -183,12 +277,12 @@ You can also add and set other flink properties which are 
not listed in the tabl
   <tr>
     <td>table.exec.resource.default-parallelism</td>
     <td>1</td>
-    <td>Default parallelism for flink sql job</td>
+    <td>Default parallelism for Flink sql job</td>
   </tr>
   <tr>
     <td>zeppelin.flink.scala.color</td>
     <td>true</td>
-    <td>Whether display scala shell output in colorful format</td>
+    <td>Whether display Scala shell output in colorful format</td>
   </tr>
 
   <tr>
@@ -204,7 +298,7 @@ You can also add and set other flink properties which are 
not listed in the tabl
   <tr>
     <td>zeppelin.flink.module.enableHive</td>
     <td>false</td>
-    <td>Whether enable hive module, hive udf take precedence over flink udf if 
hive module is enabled.</td>
+    <td>Whether enable hive module, hive udf take precedence over Flink udf if 
hive module is enabled.</td>
   </tr>
   <tr>
     <td>zeppelin.flink.maxResult</td>
@@ -212,157 +306,329 @@ You can also add and set other flink properties which 
are not listed in the tabl
     <td>max number of row returned by sql interpreter</td>
   </tr>
   <tr>
+    <td>`zeppelin.flink.job.check_interval`</td>
+    <td>1000</td>
+    <td>Check interval (in milliseconds) to check Flink job progress</td>
+  </tr>
+  <tr>
     <td>`flink.interpreter.close.shutdown_cluster`</td>
     <td>true</td>
-    <td>Whether shutdown application when closing interpreter</td>
+    <td>Whether shutdown Flink cluster when closing interpreter</td>
   </tr>
   <tr>
     <td>`zeppelin.interpreter.close.cancel_job`</td>
     <td>true</td>
-    <td>Whether cancel flink job when closing interpreter</td>
-  </tr>
-  <tr>
-    <td>`zeppelin.flink.job.check_interval`</td>
-    <td>1000</td>
-    <td>Check interval (in milliseconds) to check flink job progress</td>
+    <td>Whether cancel Flink job when closing interpreter</td>
   </tr>
 </table>
 
 
-## StreamExecutionEnvironment, ExecutionEnvironment, StreamTableEnvironment, 
BatchTableEnvironment
+## Interpreter Binding Mode
 
-Zeppelin will create 6 variables as flink scala (`%flink`) entry point:
+The default [interpreter binding 
mode](../usage/interpreter/interpreter_binding_mode.html) is `globally shared`. 
That means all notes share the same Flink interpreter which means they share 
the same Flink cluster.
+In practice, we would recommend you to use `isolated per note` which means 
each note has own Flink interpreter without affecting each other (Each one has 
his own Flink cluster). 
 
-* `senv`    (StreamExecutionEnvironment), 
-* `benv`     (ExecutionEnvironment)
-* `stenv`   (StreamTableEnvironment for blink planner) 
-* `btenv`   (BatchTableEnvironment for blink planner)
-* `stenv_2`   (StreamTableEnvironment for flink planner) 
-* `btenv_2`   (BatchTableEnvironment for flink planner)
 
-And will create 6 variables as pyflink (`%flink.pyflink` or `%flink.ipyflink`) 
entry point:
+## Execution Mode
 
-* `s_env`    (StreamExecutionEnvironment), 
-* `b_env`     (ExecutionEnvironment)
-* `st_env`   (StreamTableEnvironment for blink planner) 
-* `bt_env`   (BatchTableEnvironment for blink planner)
-* `st_env_2`   (StreamTableEnvironment for flink planner) 
-* `bt_env_2`   (BatchTableEnvironment for flink planner)
+Flink in Zeppelin supports 4 execution modes (`flink.execution.mode`):
+
+* Local
+* Remote
+* Yarn
+* Yarn Application
+
+### Local Mode
+
+Running Flink in local mode will start a MiniCluster in local JVM. By default, 
the local MiniCluster use port 8081, so make sure this port is available in 
your machine,
+otherwise you can configure `rest.port` to specify another port. You can also 
specify `local.number-taskmanager` and `flink.tm.slot` to customize the number 
of TM and number of slots per TM.
+Because by default it is only 4 TM with 1 slot in this MiniCluster which may 
not be enough for some cases.
+
+### Remote Mode
+
+Running Flink in remote mode will connect to an existing Flink cluster which 
could be standalone cluster or yarn session cluster. Besides specifying 
`flink.execution.mode` to be `remote`, you also need to specify
+`flink.execution.remote.host` and `flink.execution.remote.port` to point to 
Flink job manager's rest api address.
+
+### Yarn Mode
+
+In order to run Flink in Yarn mode, you need to make the following settings:
+
+* Set `flink.execution.mode` to be `yarn`
+* Set `HADOOP_CONF_DIR` in Flink's interpreter setting or `zeppelin-env.sh`.
+* Make sure `hadoop` command is on your `PATH`. Because internally Flink will 
call command `hadoop classpath` and load all the hadoop related jars in the 
Flink interpreter process
+
+In this mode, Zeppelin would launch a Flink yarn session cluster for you and 
destroy it when you shutdown your Flink interpreter.
+
+### Yarn Application Mode
+
+In the above yarn mode, there will be a separated Flink interpreter process on 
the Zeppelin server host. However, this may run out of resources when there are 
too many interpreter processes.
+So in practise, we would recommend you to use yarn application mode if you are 
using Flink 1.11 or afterwards (yarn application mode is only supported after 
Flink 1.11). 
+In this mode Flink interpreter runs in the JobManager which is in yarn 
container.
+In order to run Flink in yarn application mode, you need to make the following 
settings:
+
+* Set `flink.execution.mode` to be `yarn-application`
+* Set `HADOOP_CONF_DIR` in Flink's interpreter setting or `zeppelin-env.sh`.
+* Make sure `hadoop` command is on your `PATH`. Because internally flink will 
call command `hadoop classpath` and load all the hadoop related jars in Flink 
interpreter process
 
-## Blink/Flink Planner
 
-There are 2 planners supported by Flink's table api: `flink` & `blink`.
+## Flink Scala
 
-* If you want to use DataSet api, and convert it to flink table then please 
use flink planner (`btenv_2` and `stenv_2`).
-* In other cases, we would always recommend you to use `blink` planner. This 
is also what flink batch/streaming sql interpreter use (`%flink.bsql` & 
`%flink.ssql`)
+Scala is the default language of Flink on Zeppelin(`%flink`), and it is also 
the entry point of Flink interpreter. Underneath Flink interpreter will create 
Scala shell 
+which would create several built-in variables, including 
ExecutionEnvironment,StreamExecutionEnvironment and so on. 
+So don't create these Flink environment variables again, otherwise you might 
hit weird issues. The Scala code you write in Zeppelin will be submitted to 
this Scala shell.  
+Here are the builtin variables created in Flink Scala shell.
+
+* senv (StreamExecutionEnvironment),
+* benv (ExecutionEnvironment)
+* stenv (StreamTableEnvironment for blink planner)
+* btenv (BatchTableEnvironment for blink planner)
+* stenv_2 (StreamTableEnvironment for flink planner)
+* btenv_2 (BatchTableEnvironment for flink planner)
+* z  (ZeppelinContext)
+
+### Blink/Flink Planner
+
+There are 2 planners supported by Flink SQL: `flink` & `blink`.
+
+* If you want to use DataSet api, and convert it to Flink table then please 
use `flink` planner (`btenv_2` and `stenv_2`).
+* In other cases, we would always recommend you to use `blink` planner. This 
is also what Flink batch/streaming sql interpreter use (`%flink.bsql` & 
`%flink.ssql`)
 
 Check this 
[page](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#main-differences-between-the-two-planners)
 for the difference between flink planner and blink planner.
 
 
-## Execution mode (Local/Remote/Yarn/Yarn Application)
+### Stream WordCount Example
 
-Flink in Zeppelin supports 4 execution modes (`flink.execution.mode`):
+You can write whatever Scala code in Zeppelin. 
 
-* Local
-* Remote
-* Yarn
-* Yarn Application
+e.g. in the following example, we write a classical streaming wordcount 
example.
 
-### Run Flink in Local Mode
 
-Running Flink in Local mode will start a MiniCluster in local JVM. By default, 
the local MiniCluster will use port 8081, so make sure this port is available 
in your machine,
-otherwise you can configure `rest.port` to specify another port. You can also 
specify `local.number-taskmanager` and `flink.tm.slot` to customize the number 
of TM and number of slots per TM, 
-because by default it is only 4 TM with 1 Slots which may not be enough for 
some cases.
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_streaming_wordcount.png"
 width="80%">
 
-### Run Flink in Remote Mode
 
-Running Flink in remote mode will connect to an existing flink cluster which 
could be standalone cluster or yarn session cluster. Besides specifying 
`flink.execution.mode` to be `remote`. You also need to specify
-`flink.execution.remote.host` and `flink.execution.remote.port` to point to 
flink job manager.
+### Code Completion
 
-### Run Flink in Yarn Mode
+You can type tab for code completion.
 
-In order to run flink in Yarn mode, you need to make the following settings:
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_scala_codecompletion.png"
 width="80%">
 
-* Set `flink.execution.mode` to `yarn`
-* Set `HADOOP_CONF_DIR` in flink's interpreter setting or `zeppelin-env.sh`.
-* Make sure `hadoop` command is on your PATH. Because internally flink will 
call command `hadoop classpath` and load all the hadoop related jars in the 
flink interpreter process
+### ZeppelinContext
 
-### Run Flink in Yarn Application Mode
+`ZeppelinContext` provides some additional functions and utilities.
+See [Zeppelin-Context](../usage/other_features/zeppelin_context.html) for more 
details. 
+For Flink interpreter, you can use `z` to display Flink `Dataset/Table`. 
 
-In the above yarn mode, there will be a separated flink interpreter process. 
This may run out of resources when there're many interpreter processes.
-So it is recommended to use yarn application mode if you are using flink 1.11 
or afterwards (yarn application mode is only supported after flink 1.11). In 
this mode flink interpreter runs in the JobManager which is in yarn container.
-In order to run flink in yarn application mode, you need to make the following 
settings:
+e.g. you can use `z.show` to display DataSet, Batch Table, Stream Table.
 
-* Set `flink.execution.mode` to `yarn-application`
-* Set `HADOOP_CONF_DIR` in flink's interpreter setting or `zeppelin-env.sh`.
-* Make sure `hadoop` command is on your PATH. Because internally flink will 
call command `hadoop classpath` and load all the hadoop related jars in the 
flink interpreter process
+* z.show(DataSet)
 
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_z_dataset.png">
 
-## How to use Hive
 
-In order to use Hive in Flink, you have to make the following setting.
+* z.show(Batch Table)
 
-* Set `zeppelin.flink.enableHive` to be true
-* Set `zeppelin.flink.hive.version` to be the hive version you are using.
-* Set `HIVE_CONF_DIR` to be the location where `hive-site.xml` is located. 
Make sure hive metastore is started and you have configured 
`hive.metastore.uris` in `hive-site.xml`
-* Copy the following dependencies to the lib folder of flink installation. 
-    * flink-connector-hive_2.11–1.10.0.jar
-    * flink-hadoop-compatibility_2.11–1.10.0.jar
-    * hive-exec-2.x.jar (for hive 1.x, you need to copy hive-exec-1.x.jar, 
hive-metastore-1.x.jar, libfb303–0.9.2.jar and libthrift-0.9.2.jar)
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_z_batch_table.png">
+
+
+* z.show(Stream Table)
+
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_z_stream_table.gif">
+
+
+## Flink SQL
+
+In Zeppelin, there are 2 kinds of Flink sql interpreter you can use
+
+* `%flink.ssql`
+Streaming Sql interpreter which launch Flink streaming job via 
`StreamTableEnvironment`
+* `%flink.bsql`
+Batch Sql interpreter which launch Flink batch job via `BatchTableEnvironment`
+
+Flink Sql interpreter in Zeppelin is equal to Flink Sql-client + many other 
enhancement features.
+
+### Enhancement SQL Features
+
+#### Support batch SQL and streaming sql together.
 
+In Flink Sql-client, either you run streaming sql or run batch sql in one 
session. You can not run them together. 
+But in Zeppelin, you can do that. `%flink.ssql` is used for running streaming 
sql, while `%flink.bsql` is used for running batch sql. 
+Batch/Streaming Flink jobs run in the same Flink session cluster.
 
-## Flink Batch SQL
+#### Support multiple statements
 
-`%flink.bsql` is used for flink's batch sql. You can type `help` to get all 
the available commands. 
-It supports all the flink sql, including DML/DDL/DQL.
+You can write multiple sql statements in one paragraph, each sql statement is 
separated by semicolon. 
 
-* Use `insert into` statement for batch ETL
-* Use `select` statement for batch data analytics 
+#### Comment support
 
-## Flink Streaming SQL
+2 kinds of sql comments are supported in Zeppelin:
 
-`%flink.ssql` is used for flink's streaming sql. You just type `help` to get 
all the available commands. 
-It supports all the flink sql, including DML/DDL/DQL.
+* Single line comment start with `--`
+* Multiple line comment around with `/* */`
 
-* Use `insert into` statement for streaming ETL
-* Use `select` statement for streaming data analytics
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_sql_comment.png">
 
-## Streaming Data Visualization
 
-Zeppelin supports 3 types of streaming data analytics:
+#### Job parallelism setting
+
+You can set the sql parallelism via paragraph local property: `parallelism`
+
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_sql_parallelism.png">
+
+#### Support multiple insert
+
+Sometimes you have multiple insert statements which read the same source, 
+but write to different sinks. By default, each insert statement would launch a 
separated Flink job, 
+but you can set paragraph local property: `runAsOne` to be `true` to run them 
in one single Flink job.
+
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_sql_multiple_insert.png">
+
+#### Set job name
+
+You can set Flink job name for insert statement via setting paragraph local 
property: `jobName`. To be noticed, 
+you can only set job name for insert statement. Select statement is not 
supported yet. 
+And this kind of setting only works for single insert statement. It doesn't 
work for multiple insert we mentioned above.
+
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_sql_jobname.png">
+
+### Streaming Data Visualization
+
+Zeppelin can visualize the select sql result of Flink streaming job. Overall 
it supports 3 modes:
+
 * Single
 * Update
 * Append
 
-### type=single
-Single mode is for the case when the result of sql statement is always one 
row, such as the following example. The output format is HTML, 
+#### Single Mode
+
+Single mode is for the case when the result of sql statement is always one 
row, 
+such as the following example. The output format is HTML, 
 and you can specify paragraph local property `template` for the final output 
content template. 
-And you can use `{i}` as placeholder for the ith column of result.
+You can use `{i}` as placeholder for the `ith` column of result.
+
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_single_mode.gif">
 
- <center>
-   ![Interactive 
Help]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_single_mode.gif)
- </center>
 
-### type=update
-Update mode is suitable for the case when the output is more than one rows, 
and always will be updated continuously. 
+#### Update Mode
+
+Update mode is suitable for the case when the output is more than one rows, 
and will always be updated continuously. 
 Here’s one example where we use group by.
 
- <center>
-   ![Interactive 
Help]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_update_mode.gif)
- </center>
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_update_mode.gif">
+
+#### Append Mode
+
+Append mode is suitable for the scenario where output data is always appended. 
+E.g. the following example which use tumble window.
+
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_append_mode.gif">
+
+## PyFlink
+
+PyFlink is Python entry point of  Flink on Zeppelin, internally Flink 
interpreter will create Python shell which
+would create Flink's environment variables (including ExecutionEnvironment, 
StreamExecutionEnvironment and so on).
+To be noticed, the java environment behind Pyflink is created in Scala shell.
+That means underneath Scala shell and Python shell share the same environment.
+These are variables created in Python shell.
+
+* `s_env`    (StreamExecutionEnvironment),
+* `b_env`     (ExecutionEnvironment)
+* `st_env`   (StreamTableEnvironment for blink planner)
+* `bt_env`   (BatchTableEnvironment for blink planner)
+* `st_env_2`   (StreamTableEnvironment for flink planner)
+* `bt_env_2`   (BatchTableEnvironment for flink planner)
+
+
+### Configure PyFlink
+
+There are 3 things you need to configure to make Pyflink work in Zeppelin.
+
+* Install pyflink
+  e.g. ( pip install apache-flink==1.11.1 ).
+  If you need to use Pyflink udf, then you to install pyflink on all the task 
manager nodes. That means if you are using yarn, then all the yarn nodes need 
to install pyflink.
+* Copy `python` folder under `${FLINK_HOME}/opt` to `${FLINK_HOME/lib`.
+* Set `zeppelin.pyflink.python` as the python executable path. By default, it 
is the python in `PATH`. In case you have multiple versions of python 
installed, you need to configure `zeppelin.pyflink.python` as the python 
version you want to use.
+
+### How to use PyFlink
+
+There are 2 ways to use PyFlink in Zeppelin
+
+* `%flink.pyflink`
+* `%flink.ipyflink`
+
+`%flink.pyflink` is much simple and easy,  you don't need to do anything 
except the above setting,
+but its function is also limited. We suggest you to use `%flink.ipyflink` 
which provides almost the same user experience like jupyter.
+
+### Configure IPyFlink
+
+If you don't have anaconda installed, then you need to install the following 3 
libraries.
+
+```
+pip install jupyter
+pip install grpcio
+pip install protobuf
+```
+
+If you have anaconda installed, then you only need to install following 2 
libraries.
+
+```
+pip install grpcio
+pip install protobuf
+```
+
+`ZeppelinContext` is also available in PyFlink, you can use it almost the same 
as in Flink Scala.
+
+Check the [Python doc](python.html) for more features of IPython.
+
+
+## Third party dependencies
+
+It is very common to have third party dependencies when you write Flink job in 
whatever languages (Scala, Python, Sql).
+It is very easy to add dependencies in IDE (e.g. add dependency in pom.xml),
+but how can you do that in Zeppelin ? Mainly there are 2 settings you can use 
to add third party dependencies
+
+* flink.execution.packages
+* flink.execution.jars
+
+### flink.execution.packages
+
+This is the recommended way of adding dependencies. Its implementation is the 
same as adding
+dependencies in `pom.xml`. Underneath it would download all the packages and 
its transitive dependencies
+from maven repository, then put them on the classpath. Here's one example of 
how to add kafka connector of Flink 1.10 via [inline 
configuration](../usage/interpreter/overview.html#inline-generic-configuration).
+
+```
+%flink.conf
+
+flink.execution.packages  
org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0
+```
+
+The format is `artifactGroup:artifactId:version`, if you have multiple 
packages,
+then separate them with comma. `flink.execution.packages` requires internet 
accessible.
+So if you can not access internet, you need to use `flink.execution.jars` 
instead.
+
+### flink.execution.jars
+
+If your Zeppelin machine can not access internet or your dependencies are not 
deployed to maven repository,
+then you can use `flink.execution.jars` to specify the jar files you depend on 
(each jar file is separated with comma)
+
+Here's one example of how to add kafka dependencies(including kafka connector 
and its transitive dependencies) via `flink.execution.jars`
+
+```
+%flink.conf
+
+flink.execution.jars /usr/lib/flink-kafka/target/flink-kafka-1.0-SNAPSHOT.jar
+```
 
-### type=append
-Append mode is suitable for the scenario where output data is always appended. 
E.g. the following example which use tumble window.
 
- <center>
-   ![Interactive 
Help]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_append_mode.gif)
- </center>
- 
 ## Flink UDF
 
-You can use Flink scala UDF or Python UDF in sql. UDF for batch and streaming 
sql is the same. Here're 2 examples.
+There are 4 ways you can define UDF in Zeppelin.
+
+* Write Scala UDF
+* Write PyFlink UDF
+* Create UDF via SQL
+* Configure udf jar via flink.udf.jars
 
-* Scala UDF
+### Scala UDF
 
 ```scala
 %flink
@@ -370,11 +636,16 @@ You can use Flink scala UDF or Python UDF in sql. UDF for 
batch and streaming sq
 class ScalaUpper extends ScalarFunction {
   def eval(str: String) = str.toUpperCase
 }
-btenv.registerFunction("scala_upper", new ScalaUpper())
 
+btenv.registerFunction("scala_upper", new ScalaUpper())
 ```
 
-* Python UDF
+It is very straightforward to define scala udf almost the same as what you do 
in IDE.
+After creating udf class, you need to register it via `btenv`.
+You can also register it via `stenv` which share the same Catalog with `btenv`.
+
+
+### Python UDF
 
 ```python
 
@@ -387,54 +658,78 @@ class PythonUpper(ScalarFunction):
 bt_env.register_function("python_upper", udf(PythonUpper(), 
DataTypes.STRING(), DataTypes.STRING()))
 
 ```
+It is also very straightforward to define Python udf almost the same as what 
you do in IDE. 
+After creating udf class, you need to register it via `bt_env`. 
+You can also register it via `st_env` which share the same Catalog with 
`bt_env`.
 
-Zeppelin only supports scala and python for flink interpreter, if you want to 
write a java udf or the udf is pretty complicated which make it not suitable to 
write in Zeppelin,
-then you can write the udf in IDE and build an udf jar.
-In Zeppelin you just need to specify `flink.udf.jars` to this jar, and flink
-interpreter will detect all the udfs in this jar and register all the udfs to 
TableEnvironment, the udf name is the class name.
+### UDF via SQL
 
-## PyFlink(%flink.pyflink)
-In order to use PyFlink in Zeppelin, you just need to do the following 
configuration.
-* Install apache-flink (e.g. pip install apache-flink)
-* Set `zeppelin.pyflink.python` to the python executable where apache-flink is 
installed in case you have multiple python installed.
-* Copy flink-python_2.11–1.10.0.jar from flink opt folder to flink lib folder
+Some simple udf can be written in Zeppelin. But if the udf logic is very 
complicated, 
+then it is better to write it in IDE, then register it in Zeppelin as following
 
-And PyFlink will create 6 variables for you:
+```sql
+%flink.ssql
 
-* `s_env`    (StreamExecutionEnvironment), 
-* `b_env`     (ExecutionEnvironment)
-* `st_env`   (StreamTableEnvironment for blink planner) 
-* `bt_env`   (BatchTableEnvironment for blink planner)
-* `st_env_2`   (StreamTableEnvironment for flink planner) 
-* `bt_env_2`   (BatchTableEnvironment for flink planner)
+CREATE FUNCTION myupper AS 'org.apache.zeppelin.flink.udf.JavaUpper';
+```
+
+But this kind of approach requires the udf jar must be on `CLASSPATH`, 
+so you need to configure `flink.execution.jars` to include this udf jar on 
`CLASSPATH`, such as following:
+
+```
+%flink.conf
+
+flink.execution.jars /usr/lib/flink-udf-1.0-SNAPSHOT.jar
+```
+
+### flink.udf.jars
+
+The above 3 approaches all have some limitations:
+
+* It is suitable to write simple Scala udf or Python udf in Zeppelin, but not 
suitable to write very complicated udf in Zeppelin. Because notebook doesn't 
provide advanced features compared to IDE, such as package management, code 
navigation and etc.
+* It is not easy to share the udf between notes or users, you have to run the 
paragraph of defining udf in each flink interpreter.
+
+So when you have many udfs or udf logic is very complicated and you don't want 
to register them by yourself every time, then you can use `flink.udf.jars`
+
+* Step 1. Create a udf project in your IDE, write your udf there.
+* Step 2. Set `flink.udf.jars` to point to the udf jar you build from your udf 
project
+
+For example,
+
+```
+%flink.conf
+
+flink.execution.jars /usr/lib/flink-udf-1.0-SNAPSHOT.jar
+```
+
+Zeppelin would scan this jar, find out all the udf classes and then register 
them automatically for you. 
+The udf name is the class name. For example, here's the output of show 
functions after specifing the above udf jars in `flink.udf.jars`
+
+<img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_udf_jars.png">
+
+By default, Zeppelin would scan all the classes in this jar, 
+so it would be pretty slow if your jar is very big specially when your udf jar 
has other dependencies. 
+So in this case we would recommend you to specify `flink.udf.jars.packages` to 
specify the package to scan,
+this can reduce the number of classes to scan and make the udf detection much 
faster.
+
+
+## How to use Hive
+
+In order to use Hive in Flink, you have to make the following settings.
+
+* Set `zeppelin.flink.enableHive` to be true
+* Set `zeppelin.flink.hive.version` to be the hive version you are using.
+* Set `HIVE_CONF_DIR` to be the location where `hive-site.xml` is located. 
Make sure hive metastore is started and you have configured 
`hive.metastore.uris` in `hive-site.xml`
+* Copy the following dependencies to the lib folder of flink installation.
+    * flink-connector-hive_2.11–*.jar
+    * flink-hadoop-compatibility_2.11–*.jar
+    * hive-exec-2.x.jar (for hive 1.x, you need to copy hive-exec-1.x.jar, 
hive-metastore-1.x.jar, libfb303–0.9.2.jar and libthrift-0.9.2.jar)
 
-### IPython Support(%flink.ipyflink)
-
-By default, zeppelin would use IPython in `%flink.pyflink` when IPython is 
available, Otherwise it would fall back to the original python implementation.
-For the IPython features, you can refer doc[Python Interpreter](python.html)
-
-## ZeppelinContext
-Zeppelin automatically injects `ZeppelinContext` as variable `z` in your 
Scala/Python environment. `ZeppelinContext` provides some additional functions 
and utilities.
-See [Zeppelin-Context](../usage/other_features/zeppelin_context.html) for more 
details. You can use `z` to display both flink DataSet and batch/stream table.
-
-* Display DataSet
- <center>
-   ![Interactive 
Help]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_z_dataset.png)
- </center>
- 
-* Display Batch Table
- <center>
-   ![Interactive 
Help]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_z_batch_table.png)
- </center>
-* Display Stream Table
- <center>
-   ![Interactive 
Help]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_z_stream_table.gif)
- </center>
 
 ## Paragraph local properties
 
 In the section of `Streaming Data Visualization`, we demonstrate the different 
visualization type via paragraph local properties: `type`. 
-In this section, we will list and explain all the supported local properties 
in flink interpreter.
+In this section, we will list and explain all the supported local properties 
in Flink interpreter.
 
 <table class="table-configuration">
   <tr>
@@ -498,5 +793,8 @@ In this section, we will list and explain all the supported 
local properties in
 
 Zeppelin is shipped with several Flink tutorial notes which may be helpful for 
you. You can check for more features in the tutorial notes.
 
+## Community
+
+[Join our community](http://zeppelin.apache.org/community.html) to discuss 
with others.
 
 

Reply via email to