This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch engine-flink in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/engine-flink by this push: new ca7ffe6 KYLIN-4110 Provider config options to specify yarn queue and nodelabel ca7ffe6 is described below commit ca7ffe6aa524408bd831a8f68df090cf349987c4 Author: yanghua <yanghua1...@gmail.com> AuthorDate: Thu Jul 25 11:54:45 2019 +0800 KYLIN-4110 Provider config options to specify yarn queue and nodelabel --- core-common/src/main/resources/kylin-defaults.properties | 2 ++ .../src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java | 2 +- .../java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java | 3 +++ examples/test_case_data/sandbox/kylin.properties | 2 ++ 4 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index 8d24c71..73801e0 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -339,6 +339,8 @@ kylin.engine.flink-conf.taskmanager.numberOfTaskSlots=1 kylin.engine.flink-conf.taskmanager.memory.preallocate=false kylin.engine.flink-conf.job.parallelism=1 kylin.engine.flink-conf.program.enableObjectReuse=false +kylin.engine.flink-conf.yarn.queue= +kylin.engine.flink-conf.yarn.nodelabel= ### QUERY PUSH DOWN ### diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java index 129b3cf..95e9c4f 100644 --- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java +++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java @@ -203,7 +203,7 @@ public class FlinkExecutable extends AbstractExecutable { } else { String configOptionKey = FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.get(entry.getKey()); //flink on yarn specific option (pattern : -yn 1) - if (configOptionKey.startsWith("-y")) { + if (configOptionKey.startsWith("-y") && !entry.getValue().isEmpty()) { sb.append(" ").append(configOptionKey).append(" ").append(entry.getValue()); } else { //flink on yarn specific option (pattern : -yD taskmanager.network.memory.min=536346624) diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java index fe64f0b..d0371ca 100644 --- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java +++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java @@ -74,6 +74,9 @@ public class FlinkOnYarnConfigMapping { } } + //config options do not have mapping with config file key + flinkOnYarnConfigMap.put("yarn.queue", "-yqu"); + flinkOnYarnConfigMap.put("yarn.nodelabel", "-ynl"); } } diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 15265d3..70ee482 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -213,6 +213,8 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2 kylin.engine.flink-conf.jobmanager.heap.size=2G kylin.engine.flink-conf.taskmanager.heap.size=4G kylin.engine.flink-conf.taskmanager.numberOfTaskSlots=1 +kylin.engine.flink-conf.yarn.queue= +kylin.engine.flink-conf.yarn.nodelabel= ### QUERY PUSH DOWN ### #kylin.query.pushdown.runner-class-name=org.apache.kylin.query.adhoc.PushDownRunnerJdbcImpl