GitHub user bellemare opened an issue:
https://github.com/apache/incubator-predictionio/issues/337
Yarn support not working due to "," delimiter instead of "=" delimiter in
env("SPARK_YARN_USER_ENV")
The PIO (trunk) Engine run() function populates the SPARK_YARN_USER_ENV
using the following code. Note that the key-values are joined by a comma:
https://github.com/apache/incubator-predictionio/blob/e4a3c0c9fc1251d7355d921acb66168226446b3f/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala#L416
```
"SPARK_YARN_USER_ENV" -> sys.env.filter(kv => kv._1.startsWith("PIO_")).
map(kv => s"${kv._1}=${kv._2}").mkString(",")).run())
```
The spark YarnSparkHadoopUtil within the Spark Project (trunk) is used to
read SPARK_YARN_USER_ENV if it is set. Note that it strictly expects a "="
between the Key-Value pairs.
https://github.com/apache/spark/blob/81e5619ca141a1d3a06547d2b682cbe3f135b360/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala#L148
```
def setEnvFromInputString(env: HashMap[String, String], inputString:
String): Unit = {
if (inputString != null && inputString.length() > 0) {
val childEnvs = inputString.split(",")
val p = Pattern.compile(environmentVariableRegex)
for (cEnv <- childEnvs) {
val parts = cEnv.split("=") // split on '='
val m = p.matcher(parts(1))
val sb = new StringBuffer
while (m.find()) {
val variable = m.group(1)
var replace = ""
if (env.get(variable) != None) {
replace = env.get(variable).get
} else {
// if this key is not configured for the child .. get it from
the env
replace = System.getenv(variable)
if (replace == null) {
// the env key is note present anywhere .. simply set it
replace = ""
}
}
m.appendReplacement(sb, Matcher.quoteReplacement(replace))
}
m.appendTail(sb)
// This treats the environment variable as path variable delimited
by `File.pathSeparator`
// This is kept for backward compatibility and consistency with
Hadoop's behavior
addPathToEnvironment(env, parts(0), sb.toString)
}
}
}
```
The end-result is that when creating a Yarn client, the SPARK_YARN_USER_ENV
is not set to the convention expected under the Spark standard. This following
portion produces an array with length 1 (since there is no = to split on):
```
val parts = cEnv.split("=") // split on '='
```
And you end up getting this exception when trying to launch a Spark Yarn
client:
```
2017-01-17 09:56:20,362 INFO [email protected]$ - Using existing
engine manifest JSON at /home/path/to/work/dir/manifest.json
2017-01-17 09:56:20,364 WARN [email protected]$ - template.json
does not exist. Template metadata will not be available. (This is safe to
ignore if you are not working on a template.)
2017-01-17 09:56:21,648 INFO [email protected]$ - Submission
command: /opt/spark-current/bin/spark-submit --master yarn --deploy-mode client
--driver-memory 5G --conf spark.driver.maxResultSize=2G --conf
spark.memory.fraction=0.75 --executor-memory 25G --conf spark.executor.cores=16
--total-executor-cores 45 --conf spark.task.maxFailures=1 --conf
spark.ui.retainedJobs=111 --conf spark.ui.retainedStages=1111 --conf
spark.eventLog.enabled=true --conf spark.logConf=true --conf
spark.executor.extraJavaOptions=-Djava.util.Arrays.useLegacyMergeSort=true
--class io.prediction.workflow.CreateWorkflow --jars
file:/home/path/to/work/dir/target/scala-2.10/flyer-affinity-assembly-1.20160823.0-deps.jar,file:/home/path/to/work/dir/target/scala-2.10/flyer-affinity_2.10-1.20160823.0.jar
--files
file:/opt/PredictionIO/conf/log4j.properties,file:/opt/hadoop-current/etc/hadoop/core-site.xml,file:/opt/hbase-current/conf/hbase-site.xml
--driver-class-path /opt/PredictionIO/conf:/opt/hadoo
p-current/etc/hadoop:/opt/hbase-current/conf
file:/opt/PredictionIO/lib/pio-assembly-0.9.5.jar --engine-id
qJOA3WjMK8L8II8NqEMcRnAf7id45cDd --engine-version
886f3f5ac890f5d82e6be2d0ca82fe1f4ed7a84a --engine-variant
file:/home/path/to/work/dir/engine.json --verbosity 0 --json-extractor Both
--env
PIO_STORAGE_SOURCES_HBASE_TYPE=hbase,PIO_ENV_LOADED=1,PIO_STORAGE_REPOSITORIES_METADATA_NAME=pio_meta,PIO_FS_BASEDIR=/pio-apps,PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=xx.xx.xx.245,xx.xx.xx.156,xx.xx.xx.236,PIO_HOME=/opt/PredictionIO,PIO_FS_ENGINESDIR=/pio-apps/engines,PIO_STORAGE_SOURCES_HDFS_TYPE=hdfs,PIO_STORAGE_SOURCES_HDFS_PATH=/pio-apps/models,PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch,PIO_STORAGE_REPOSITORIES_METADATA_SOURCE=ELASTICSEARCH,PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE=HDFS,PIO_STORAGE_REPOSITORIES_EVENTDATA_NAME=pio_event,PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=pio-staging-elasticsearch,PIO_FS_TMPDIR=/pio-apps/tmp,PIO_STORAGE_REPOSITORIES_MODELDATA_NAME=pio_
model,PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE=HBASE,PIO_CONF_DIR=/opt/PredictionIO/conf,PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9300
2017-01-17 09:56:22,884 WARN [email protected] - Unable to
load native-hadoop library for your platform... using builtin-java classes
where applicable
2017-01-17 09:56:23,786 INFO [email protected] -
Extracting datasource params...
2017-01-17 09:56:23,856 INFO [email protected]$ - No
'name' is found. Default empty String will be used.
2017-01-17 09:56:24,255 INFO [email protected] -
Datasource params: (,DataSourceParams(List(eventA, eventB,
eventC),Bar,FooBar,Foo,None,None,Some(1),Some(1),None))
2017-01-17 09:56:24,256 INFO [email protected] -
Extracting preparator params...
2017-01-17 09:56:24,257 INFO [email protected] -
Preparator params: (,Empty)
2017-01-17 09:56:24,262 INFO [email protected] -
Extracting serving params...
2017-01-17 09:56:24,262 INFO [email protected] - Serving
params: (,Empty)
2017-01-17 09:56:26,051 INFO
sparkDriverActorSystem-akka.actor.default-dispatcher-3@Remoting - Starting
remoting
2017-01-17 09:56:26,173 INFO
sparkDriverActorSystem-akka.actor.default-dispatcher-3@Remoting - Remoting
started; listening on addresses
:[akka.tcp://sparkDriverActorSystem@<IP-Removed>:43037]
2017-01-17 09:56:27,702 ERROR [email protected] - Error
initializing SparkContext.
java.lang.ArrayIndexOutOfBoundsException: 1
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$setEnvFromInputString$1.apply(YarnSparkHadoopUtil.scala:264)
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$setEnvFromInputString$1.apply(YarnSparkHadoopUtil.scala:262)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.setEnvFromInputString(YarnSparkHadoopUtil.scala:262)
at
org.apache.spark.deploy.yarn.Client$$anonfun$setupLaunchEnv$6.apply(Client.scala:640)
at
org.apache.spark.deploy.yarn.Client$$anonfun$setupLaunchEnv$6.apply(Client.scala:638)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:638)
at
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:726)
at
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:142)
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:144)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:530)
at
io.prediction.workflow.WorkflowContext$.apply(WorkflowContext.scala:42)
at
io.prediction.workflow.CoreWorkflow$.runTrain(CoreWorkflow.scala:57)
at
io.prediction.workflow.CreateWorkflow$.main(CreateWorkflow.scala:247)
at io.prediction.workflow.CreateWorkflow.main(CreateWorkflow.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2017-01-17 09:56:27,787 WARN [email protected] - Stopping a
MetricsSystem that is not running
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$setEnvFromInputString$1.apply(YarnSparkHadoopUtil.scala:264)
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$setEnvFromInputString$1.apply(YarnSparkHadoopUtil.scala:262)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.setEnvFromInputString(YarnSparkHadoopUtil.scala:262)
at
org.apache.spark.deploy.yarn.Client$$anonfun$setupLaunchEnv$6.apply(Client.scala:640)
at
org.apache.spark.deploy.yarn.Client$$anonfun$setupLaunchEnv$6.apply(Client.scala:638)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:638)
at
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:726)
at
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:142)
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:144)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:530)
at
io.prediction.workflow.WorkflowContext$.apply(WorkflowContext.scala:42)
at
io.prediction.workflow.CoreWorkflow$.runTrain(CoreWorkflow.scala:57)
at
io.prediction.workflow.CreateWorkflow$.main(CreateWorkflow.scala:247)
at io.prediction.workflow.CreateWorkflow.main(CreateWorkflow.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```
----
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---