[
https://issues.apache.org/jira/browse/LIVY-995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jianzhen Wu updated LIVY-995:
-----------------------------
Component/s: REPL
Fix Version/s: 0.9.0
> JsonParseException is thrown when closing Livy session when using python
> profile
> --------------------------------------------------------------------------------
>
> Key: LIVY-995
> URL: https://issues.apache.org/jira/browse/LIVY-995
> Project: Livy
> Issue Type: Improvement
> Components: REPL
> Reporter: Jianzhen Wu
> Assignee: Jianzhen Wu
> Priority: Critical
> Fix For: 0.9.0
>
>
> Startup and enable spark.python.profile.
> {code:java}
> ./bin/pyspark --master local --conf spark.python.profile=true
> {code}
>
> Execute code related to Spark RDD. When pyspark is closed, Pyspark will
> output profile information.
> {code:java}
> >>> rdd = sc.parallelize(range(100)).map(str)
> >>> rdd.count()
> [Stage 0:> (0 + 1) /
> 1]
> 100
> >>>
> ============================================================
> Profile of RDD<id=1>
> ============================================================
> 244 function calls (241 primitive calls) in 0.001 seconds
>
> Ordered by: internal time, cumulative time
>
> ncalls tottime percall cumtime percall filename:lineno(function)
> 101 0.000 0.000 0.000 0.000 rdd.py:1237(<genexpr>)
> 101 0.000 0.000 0.000 0.000 util.py:72(wrapper)
> 1 0.000 0.000 0.000 0.000 serializers.py:255(dump_stream)
> 1 0.000 0.000 0.000 0.000 serializers.py:213(load_stream)
> 2 0.000 0.000 0.000 0.000 \{built-in method builtins.sum}
> 1 0.000 0.000 0.001 0.001 worker.py:607(process)
> 1 0.000 0.000 0.000 0.000 context.py:549(f)
> 1 0.000 0.000 0.000 0.000 \{built-in method _pickle.dumps}
> 1 0.000 0.000 0.000 0.000 serializers.py:561(read_int)
> 1 0.000 0.000 0.000 0.000 serializers.py:568(write_int)
> 4/1 0.000 0.000 0.000 0.000 rdd.py:2917(pipeline_func)
> 1 0.000 0.000 0.000 0.000 serializers.py:426(dumps)
> 1 0.000 0.000 0.000 0.000 rdd.py:1237(<lambda>)
> 1 0.000 0.000 0.000 0.000 serializers.py:135(load_stream)
> 2 0.000 0.000 0.000 0.000 rdd.py:1072(func)
> 1 0.000 0.000 0.000 0.000 rdd.py:384(func)
> 1 0.000 0.000 0.000 0.000
> util.py:67(fail_on_stopiteration)
> 1 0.000 0.000 0.000 0.000
> serializers.py:151(_read_with_length)
> 2 0.000 0.000 0.000 0.000 context.py:546(getStart)
> 3 0.000 0.000 0.000 0.000 rdd.py:416(func)
> 1 0.000 0.000 0.000 0.000
> serializers.py:216(_load_stream_without_unbatching)
> 2 0.000 0.000 0.000 0.000 \{method 'write' of
> '_io.BufferedWriter' objects}
> 1 0.000 0.000 0.000 0.000 \{method 'read' of
> '_io.BufferedReader' objects}
> 1 0.000 0.000 0.000 0.000 \{built-in method _operator.add}
> 1 0.000 0.000 0.000 0.000 \{built-in method
> builtins.hasattr}
> 3 0.000 0.000 0.000 0.000 \{built-in method builtins.len}
> 1 0.000 0.000 0.000 0.000 \{built-in method
> _struct.unpack}
> 1 0.000 0.000 0.000 0.000 rdd.py:1226(<lambda>)
> 1 0.000 0.000 0.000 0.000 \{method 'close' of 'generator'
> objects}
> 1 0.000 0.000 0.000 0.000 \{built-in method from_iterable}
> 1 0.000 0.000 0.000 0.000 \{built-in method _struct.pack}
> 1 0.000 0.000 0.000 0.000 \{method 'disable' of
> '_lsprof.Profiler' objects}
> 1 0.000 0.000 0.000 0.000 \{built-in method builtins.iter}
> {code}
>
> This is because Spark register show_profiles when Spark exit in profile.py
> {code:java}
> def add_profiler(self, id, profiler):
> """Add a profiler for RDD/UDF `id`"""
> if not self.profilers:
> if self.profile_dump_path:
> atexit.register(self.dump_profiles, self.profile_dump_path)
> else:
> atexit.register(self.show_profiles)
>
> self.profilers.append([id, profiler, False])
> {code}
>
>
> For Livy session, Livy does not convert the output to JSON format. And throw
> below exception:
>
> {code:java}
> 24/01/17 11:17:30 INFO [shutdown-hook-0] ApplicationMaster: Unregistering
> ApplicationMaster with FAILED (diag message: User class threw exception:
> com.fasterxml.jackson.core.JsonParseException: Unexpected character ('='
> (code 61)): expected a valid value (JSON String, Number, Array, Object or
> token 'null', 'true' or 'false')
> at [Source:
> (String)"============================================================"; line:
> 1, column: 2]
> at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
> at
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
> at
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:635)
> at
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1952)
> at
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:781)
> at
> com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:355)
> at
> com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2023)
> at
> com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1491)
> at
> org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods.parse(JsonMethods.scala:33)
> at
> org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods.parse$(JsonMethods.scala:20)
> at
> org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:71)
> at
> org.apache.livy.repl.PythonInterpreter.$anonfun$sendRequest$1(PythonInterpreter.scala:288)
> at scala.Option.map(Option.scala:230)
> at
> org.apache.livy.repl.PythonInterpreter.sendRequest(PythonInterpreter.scala:287)
> at
> org.apache.livy.repl.PythonInterpreter.sendShutdownRequest(PythonInterpreter.scala:277)
> at org.apache.livy.repl.ProcessInterpreter.close(ProcessInterpreter.scala:62)
> at org.apache.livy.repl.PythonInterpreter.close(PythonInterpreter.scala:234)
> at org.apache.livy.repl.Session.$anonfun$close$1(Session.scala:232)
> at org.apache.livy.repl.Session.$anonfun$close$1$adapted(Session.scala:232)
> at
> scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158)
> at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
> at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
> at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158)
> at org.apache.livy.repl.Session.close(Session.scala:232)
> at org.apache.livy.toolkit.IpynbBootstrap.close(IpynbBootstrap.scala:246)
> at org.apache.livy.toolkit.IpynbBootstrap$.main(IpynbBootstrap.scala:72)
> at org.apache.livy.toolkit.IpynbBootstrap.main(IpynbBootstrap.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:764)
> )
> {code}
> Livy sendShutdownRequest in PythonInterpreter.scala
> {code:scala}
> override protected def sendShutdownRequest(): Unit = {
> sendRequest(Map(
> "msg_type" -> "shutdown_request",
> "content" -> ()
> )).foreach { case rep =>
> warn(f"process failed to shut down while returning $rep")
> }
> }
> private def sendRequest(request: Map[String, Any]): Option[JValue] = {
> stdin.println(write(request))
> stdin.flush()
> Option(stdout.readLine()).map { case line =>
> parse(line)
> }
> }
> {code}
> Livy does not convert stdout to json when exit in fake_shell.py
> {code:python}
> def shutdown_request(_content):
> sys.exit()
> msg_type_router = {
> 'execute_request': execute_request,
> 'shutdown_request': shutdown_request,
> }
> try:
> handler = msg_type_router[msg_type]
> except KeyError:
> LOG.error('unknown message type: %s', msg_type)
> continue
> response = handler(content)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)