This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch engine-flink in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/engine-flink by this push: new 2ade921 KYLIN-3915 Make HADOOP_CLASSPATH configurable for Flink engine 2ade921 is described below commit 2ade9215712e1b8020f2583721667a966a1525fe Author: yanghua <yanghua1...@gmail.com> AuthorDate: Wed Mar 27 23:23:08 2019 +0800 KYLIN-3915 Make HADOOP_CLASSPATH configurable for Flink engine --- .../java/org/apache/kylin/engine/flink/FlinkExecutable.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java index 83cc815..6afd4b3 100644 --- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java +++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java @@ -45,6 +45,7 @@ import org.apache.kylin.metadata.model.Segments; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -151,8 +152,7 @@ public class FlinkExecutable extends AbstractExecutable { String jars = this.getParam(JARS); - String hadoopConf = null; - hadoopConf = System.getProperty("kylin.hadoop.conf.dir"); + String hadoopConf = System.getProperty("kylin.hadoop.conf.dir"); if (StringUtils.isEmpty(hadoopConf)) { throw new RuntimeException( @@ -161,6 +161,8 @@ public class FlinkExecutable extends AbstractExecutable { logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR"); + String hadoopClasspathEnv = new File(hadoopConf).getParentFile().getAbsolutePath(); + String jobJar = config.getKylinJobJarPath(); if (StringUtils.isEmpty(jars)) { jars = jobJar; @@ -173,9 +175,9 @@ public class FlinkExecutable extends AbstractExecutable { StringBuilder sb = new StringBuilder(); if (Shell.osType == Shell.OSType.OS_TYPE_WIN) { - sb.append("set HADOOP_CONF_DIR=%s && export HADOOP_CLASSPATH=/usr/hdp/2.4.0.0-169/hadoop/ && %s/bin/flink run -m yarn-cluster "); + sb.append("set HADOOP_CONF_DIR=%s && set HADOOP_CLASSPATH=%s && %s/bin/flink run -m yarn-cluster "); } else { - sb.append("export HADOOP_CONF_DIR=%s && export HADOOP_CLASSPATH=/usr/hdp/2.4.0.0-169/hadoop/ && %s/bin/flink run -m yarn-cluster "); + sb.append("export HADOOP_CONF_DIR=%s && export HADOOP_CLASSPATH=%s && %s/bin/flink run -m yarn-cluster "); } Map<String, String> flinkConfs = config.getFlinkConfigOverride(); @@ -194,12 +196,11 @@ public class FlinkExecutable extends AbstractExecutable { } String onYarnConfigOptionKey = FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.get(entry.getKey()); - sb.append(" ").append(onYarnConfigOptionKey).append(" ").append(entry.getValue()); } sb.append(" -c org.apache.kylin.common.util.FlinkEntry %s %s "); - final String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, + final String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, hadoopClasspathEnv, KylinConfig.getFlinkHome(), jars, formatArgs()); logger.info("cmd: " + cmd); final ExecutorService executorService = Executors.newSingleThreadExecutor();