This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new e34cf0f [ZEPPELIN-5428] Unify flink configuration between different execution modes (#4155) e34cf0f is described below commit e34cf0f6062643cb848c4da03aaa98635b261e77 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Sat Jul 3 13:22:07 2021 +0800 [ZEPPELIN-5428] Unify flink configuration between different execution modes (#4155) * [ZEPPELIN-5428] Unify flink configuration between different execution modes --- docs/interpreter/flink.md | 18 +++++------ .../src/main/resources/interpreter-setting.json | 18 +++++------ .../zeppelin/flink/FlinkScalaInterpreter.scala | 16 ++++++---- .../src/test/resources/log4j.properties | 2 +- .../src/test/resources/log4j2.properties | 4 +-- .../launcher/FlinkInterpreterLauncher.java | 36 ++++++++++++++++++++++ 6 files changed, 67 insertions(+), 27 deletions(-) diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md index 40d8d6f..27989b0 100644 --- a/docs/interpreter/flink.md +++ b/docs/interpreter/flink.md @@ -106,17 +106,17 @@ You can also add and set other flink properties which are not listed in the tabl <td>Port of running JobManager. Only used for remote mode</td> </tr> <tr> - <td>flink.jm.memory</td> - <td>1024</td> - <td>Total number of memory(mb) of JobManager</td> + <td>jobmanager.memory.process.size</td> + <td>1024m</td> + <td>Total number of memory of JobManager, e.g. 1024m. It is official [flink property](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/)</td> </tr> <tr> - <td>flink.tm.memory</td> - <td>1024</td> - <td>Total number of memory(mb) of TaskManager</td> + <td>taskmanager.memory.process.size</td> + <td>1024m</td> + <td>Total number of memory of TaskManager, e.g. 1024m. It is official [flink property](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/)</td> </tr> <tr> - <td>flink.tm.slot</td> + <td>taskmanager.numberOfTaskSlots</td> <td>1</td> <td>Number of slot per TaskManager</td> </tr> @@ -126,12 +126,12 @@ You can also add and set other flink properties which are not listed in the tabl <td>Total number of TaskManagers in local mode</td> </tr> <tr> - <td>flink.yarn.appName</td> + <td>yarn.application.name</td> <td>Zeppelin Flink Session</td> <td>Yarn app name</td> </tr> <tr> - <td>flink.yarn.queue</td> + <td>yarn.application.queue</td> <td>default</td> <td>queue name of yarn app</td> </tr> diff --git a/flink/flink-scala-parent/src/main/resources/interpreter-setting.json b/flink/flink-scala-parent/src/main/resources/interpreter-setting.json index 0a87ddf..e5dd7a8 100644 --- a/flink/flink-scala-parent/src/main/resources/interpreter-setting.json +++ b/flink/flink-scala-parent/src/main/resources/interpreter-setting.json @@ -47,21 +47,21 @@ "description": "Port of running JobManager. Only used for remote mode", "type": "number" }, - "flink.jm.memory": { + "jobmanager.memory.process.size": { "envName": null, "propertyName": null, - "defaultValue": "1024", - "description": "Memory for JobManager (mb)", + "defaultValue": "1024m", + "description": "Memory for JobManager, e.g. 1024m", "type": "number" }, - "flink.tm.memory": { + "taskmanager.memory.process.size": { "envName": null, "propertyName": null, - "defaultValue": "1024", - "description": "Memory for TaskManager (mb)", + "defaultValue": "1024m", + "description": "Memory for TaskManager, e.g. 1024m", "type": "number" }, - "flink.tm.slot": { + "taskmanager.numberOfTaskSlots": { "envName": null, "propertyName": null, "defaultValue": "1", @@ -75,14 +75,14 @@ "description": "Number of TaskManager in local mode", "type": "number" }, - "flink.yarn.appName": { + "yarn.application.name": { "envName": null, "propertyName": null, "defaultValue": "Zeppelin Flink Session", "description": "Yarn app name", "type": "string" }, - "flink.yarn.queue": { + "yarn.application.queue": { "envName": null, "propertyName": null, "defaultValue": "default", diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 9a5645c..96061a4 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -203,28 +203,32 @@ abstract class FlinkScalaInterpreter(val properties: Properties, this.configuration = GlobalConfiguration.loadConfiguration(flinkConfDir) var config = Config(executionMode = mode) - val jmMemory = properties.getProperty("flink.jm.memory", "1024") + val jmMemory = properties.getProperty("jobmanager.memory.process.size", + properties.getProperty("flink.jm.memory", "1024")) config = config.copy(yarnConfig = Some(ensureYarnConfig(config) .copy(jobManagerMemory = Some(jmMemory)))) - val tmMemory = properties.getProperty("flink.tm.memory", "1024") + val tmMemory = properties.getProperty("taskmanager.memory.process.size", + properties.getProperty("flink.tm.memory", "1024")) config = config.copy(yarnConfig = Some(ensureYarnConfig(config) .copy(taskManagerMemory = Some(tmMemory)))) - val appName = properties.getProperty("flink.yarn.appName", "Flink Yarn App Name") + val appName = properties.getProperty("yarn.application.name", + properties.getProperty("flink.yarn.appName", "Flink Yarn App Name")) config = config.copy(yarnConfig = Some(ensureYarnConfig(config) .copy(name = Some(appName)))) - val slotNum = Integer.parseInt(properties.getProperty("flink.tm.slot", "1")) + val slotNum = Integer.parseInt(properties.getProperty("taskmanager.numberOfTaskSlots", + properties.getProperty("flink.tm.slot", "1"))) config = config.copy(yarnConfig = Some(ensureYarnConfig(config) .copy(slots = Some(slotNum)))) - this.configuration.setInteger("taskmanager.numberOfTaskSlots", slotNum) - val queue = (properties.getProperty("flink.yarn.queue", "default")) + val queue = properties.getProperty("yarn.application.queue", + properties.getProperty("flink.yarn.queue", "default")) config = config.copy(yarnConfig = Some(ensureYarnConfig(config) .copy(queue = Some(queue)))) diff --git a/flink/flink-scala-parent/src/test/resources/log4j.properties b/flink/flink-scala-parent/src/test/resources/log4j.properties index ff1b634..69dd677 100644 --- a/flink/flink-scala-parent/src/test/resources/log4j.properties +++ b/flink/flink-scala-parent/src/test/resources/log4j.properties @@ -22,7 +22,7 @@ log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n log4j.logger.org.apache.hive=WARN -log4j.logger.org.apache.flink=INFO +log4j.logger.org.apache.flink=WARN log4j.logger.org.apache.zeppelin.flink=INFO log4j.logger.org.apache.zeppelin.python=WARN log4j.logger.org.apache.flink.streaming.api.operators.collect=ERROR diff --git a/flink/flink-scala-parent/src/test/resources/log4j2.properties b/flink/flink-scala-parent/src/test/resources/log4j2.properties index d003c42..965a410 100644 --- a/flink/flink-scala-parent/src/test/resources/log4j2.properties +++ b/flink/flink-scala-parent/src/test/resources/log4j2.properties @@ -21,8 +21,8 @@ rootLogger.level = INFO #rootLogger.appenderRef.file.ref = MainAppender # Uncomment this if you want to _only_ change Flink's logging -#logger.flink.name = org.apache.flink -#logger.flink.level = INFO +logger.flink.name = org.apache.flink +logger.flink.level = WARN # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java index 21379f6..76017b2 100644 --- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { @@ -54,6 +55,8 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { envs.put("FLINK_LIB_DIR", flinkHome + "/lib"); envs.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins"); + normalizeConfiguration(context); + // yarn application mode specific logic if ("yarn-application".equalsIgnoreCase( context.getProperties().getProperty("flink.execution.mode"))) { @@ -66,6 +69,39 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { return envs; } + // do mapping between configuration of different execution modes. + private void normalizeConfiguration(InterpreterLaunchContext context) { + Properties intpProperties = context.getProperties(); + setNewProperty(intpProperties, "flink.jm.memory", "jobmanager.memory.process.size", true); + setNewProperty(intpProperties, "flink.tm.memory", "taskmanager.memory.process.size", true); + setNewProperty(intpProperties, "flink.tm.slot", "taskmanager.numberOfTaskSlots", false); + setNewProperty(intpProperties, "flink.yarn.appName", "yarn.application.name", false); + setNewProperty(intpProperties, "flink.yarn.queue", "yarn.application.queue", false); + } + + /** + * flink.jm.memory and flink.tm.memory only support int value and the unit is mb. (e.g. 1024) + * And you need to specify unit for jobmanager.memory.process.size and + * taskmanager.memory.process.size, e.g. 1024 mb. + * @param properties + * @param oldKey + * @param newKey + * @param isMemoryProperty + */ + private void setNewProperty(Properties properties, + String oldKey, + String newKey, + boolean isMemoryProperty) { + String value = properties.getProperty(oldKey); + if (StringUtils.isNotBlank(value) && !properties.containsKey(newKey)) { + if (isMemoryProperty) { + properties.put(newKey, value + "mb"); + } else { + properties.put(newKey, value); + } + } + } + private String chooseFlinkAppJar(String flinkHome) throws IOException { File flinkLibFolder = new File(flinkHome, "lib"); List<File> flinkDistFiles =