This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 281c591  [SPARK-31399][CORE][2.4] Support indylambda Scala closure in 
ClosureCleaner
281c591 is described below

commit 281c591ef9a7fdcb3838ee68857b9bb91a3ca4f1
Author: Kris Mok <[email protected]>
AuthorDate: Tue May 19 19:29:33 2020 +0000

    [SPARK-31399][CORE][2.4] Support indylambda Scala closure in ClosureCleaner
    
    This is a backport of https://github.com/apache/spark/pull/28463 from 
Apache Spark master/3.0 to 2.4.
    Minor adaptation include:
    - Retain the Spark 2.4-specific behavior of skipping the indylambda check 
when using Scala 2.11
    - Remove unnecessary LMF restrictions in ClosureCleaner tests
    - Address review comments in the original PR from kiszk
    
    Tested with the default Scala 2.11 build, and also tested 
ClosureCleaner-related tests in Scala 2.12 build as well:
    - repl: `SingletonReplSuite`
    - core: `ClosureCleanerSuite` and `ClosureCleanerSuite2`
    
    ---
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to enhance Spark's `ClosureCleaner` to support 
"indylambda" style of Scala closures to the same level as the existing 
implementation for the old (inner class) style ones. The goal is to reach 
feature parity with the support of the old style Scala closures, with as close 
to bug-for-bug compatibility as possible.
    
    Specifically, this PR addresses one lacking support for indylambda closures 
vs the inner class closures:
    - When a closure is declared in a Scala REPL and captures the enclosing 
REPL line object, such closure should be cleanable (unreferenced fields on the 
enclosing REPL line object should be cleaned)
    
    This PR maintains the same limitations in the new indylambda closure 
support as the old inner class closures, in particular the following two:
    - Cleaning is only available for one level of REPL line object. If a 
closure captures state from a REPL line object further out from the immediate 
enclosing one, it won't be subject to cleaning. See example below.
    - "Sibling" closures are not handled yet. A "sibling" closure is defined 
here as a closure that is directly or indirectly referenced by the starting 
closure, but isn't lexically enclosing. e.g.
      ```scala
      {
        val siblingClosure = (x: Int) => x + this.fieldA   // captures `this`, 
references `fieldA` on `this`.
        val startingClosure = (y: Int) => y + this.fieldB + siblingClosure(y)  
// captures `this` and `siblingClosure`, references `fieldB` on `this`.
      }
      ```
    
    The changes are intended to be minimal, with further code cleanups planned 
in separate PRs.
    
    Jargons:
    - old, inner class style Scala closures, aka `delambdafy:inline`: default 
in Scala 2.11 and before
    - new, "indylambda" style Scala closures, aka `delambdafy:method`: default 
in Scala 2.12 and later
    
    ### Why are the changes needed?
    
    There had been previous effortsto extend Spark's `ClosureCleaner` to 
support "indylambda" Scala closures, which is necessary for proper Scala 2.12 
support. Most notably the work done for 
[SPARK-14540](https://issues.apache.org/jira/browse/SPARK-14540).
    
    But the previous efforts had missed one import scenario: a Scala closure 
declared in a Scala REPL, and it captures the enclosing `this` -- a REPL line 
object. e.g. in a Spark Shell:
    ```scala
    :pa
    class NotSerializableClass(val x: Int)
    val ns = new NotSerializableClass(42)
    val topLevelValue = "someValue"
    val func = (j: Int) => {
      (1 to j).flatMap { x =>
        (1 to x).map { y => y + topLevelValue }
      }
    }
    <Ctrl+D>
    sc.parallelize(0 to 2).map(func).collect
    ```
    In this example, `func` refers to a Scala closure that captures the 
enclosing `this` because it needs to access `topLevelValue`, which is in turn 
implemented as a field on the enclosing REPL line object.
    
    The existing `ClosureCleaner` in Spark supports cleaning this case in Scala 
2.11-, and this PR brings feature parity to Scala 2.12+.
    
    Note that the existing cleaning logic only supported one level of REPL line 
object nesting. This PR does not go beyond that. When a closure references 
state declared a few commands earlier, the cleaning will fail in both Scala 
2.11 and Scala 2.12. e.g.
    ```scala
    scala> :pa
    // Entering paste mode (ctrl-D to finish)
    
    class NotSerializableClass1(val x: Int)
    case class Foo(id: String)
    val ns = new NotSerializableClass1(42)
    val topLevelValue = "someValue"
    
    // Exiting paste mode, now interpreting.
    
    defined class NotSerializableClass1
    defined class Foo
    ns: NotSerializableClass1 = NotSerializableClass1615b1baf
    topLevelValue: String = someValue
    
    scala> :pa
    // Entering paste mode (ctrl-D to finish)
    
    val closure2 = (j: Int) => {
      (1 to j).flatMap { x =>
        (1 to x).map { y => y + topLevelValue } // 2 levels
      }
    }
    
    // Exiting paste mode, now interpreting.
    
    closure2: Int => scala.collection.immutable.IndexedSeq[String] = <function1>
    
    scala> sc.parallelize(0 to 2).map(closure2).collect
    org.apache.spark.SparkException: Task not serializable
    ...
    ```
    in the Scala 2.11 / Spark 2.4.x case:
    ```
    Caused by: java.io.NotSerializableException: NotSerializableClass1
    Serialization stack:
        - object not serializable (class: NotSerializableClass1, value: 
NotSerializableClass1615b1baf)
        - field (class: $iw, name: ns, type: class NotSerializableClass1)
        - object (class $iw, $iw64df3f4b)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw66e6e5e9)
        - field (class: $line14.$read, name: $iw, type: class $iw)
        - object (class $line14.$read, $line14.$readc310aa3)
        - field (class: $iw, name: $line14$read, type: class $line14.$read)
        - object (class $iw, $iw79224636)
        - field (class: $iw, name: $outer, type: class $iw)
        - object (class $iw, $iw636d4cdc)
        - field (class: $anonfun$1, name: $outer, type: class $iw)
        - object (class $anonfun$1, <function1>)
    ```
    in the Scala 2.12 / Spark 2.4.x case after this PR:
    ```
    Caused by: java.io.NotSerializableException: NotSerializableClass1
    Serialization stack:
        - object not serializable (class: NotSerializableClass1, value: 
NotSerializableClass16f3b4c9a)
        - field (class: $iw, name: ns, type: class NotSerializableClass1)
        - object (class $iw, $iw2945a3c1)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw152705d0)
        - field (class: $line14.$read, name: $iw, type: class $iw)
        - object (class $line14.$read, $line14.$read7cf311eb)
        - field (class: $iw, name: $line14$read, type: class $line14.$read)
        - object (class $iw, $iwd980dac)
        - field (class: $iw, name: $outer, type: class $iw)
        - object (class $iw, $iw557d9532)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class $iw, 
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
 implementation=invokeStatic 
$anonfun$closure2$1$adapted:(L$iw;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;,
 
instantiatedMethodType=(Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;,
 numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class $Lambda$2103/815179920, $Lambda$2103/815179920569b57c4)
    ```
    
    For more background of the new and old ways Scala lowers closures to Java 
bytecode, please see [A note on how NSC (New Scala Compiler) lowers 
lambdas](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-notes-md).
    
    For more background on how Spark's `ClosureCleaner` works and what's needed 
to make it support "indylambda" Scala closures, please refer to [A Note on 
Apache Spark's 
ClosureCleaner](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-spark_closurecleaner_notes-md).
    
    #### tl;dr
    
    The `ClosureCleaner` works like a mark-sweep algorithm on fields:
    - Finding (a chain of) outer objects referenced by the starting closure;
    - Scanning the starting closure and its inner closures and marking the 
fields on the outer objects accessed;
    - Cloning the outer objects, nulling out fields that are not accessed by 
any closure of concern.
    
    ##### Outer Objects
    
    For the old, inner class style Scala closures, the "outer objects" is 
defined as the lexically enclosing closures of the starting closure, plus an 
optional enclosing REPL line object if these closures are defined in a Scala 
REPL. All of them are on a singly-linked `$outer` chain.
    
    For the new, "indylambda" style Scala closures, the capturing 
implementation changed, so closures no longer refer to their enclosing closures 
via an `$outer` chain. However, a closure can still capture its enclosing REPL 
line object, much like the old style closures. The name of the field that 
captures this reference would be `arg$1` (instead of `$outer`).
    
    So what's missing in the `ClosureCleaner` for the "indylambda" support is 
find and potentially clone+clean the captured enclosing `this` REPL line 
object. That's what this PR implements.
    
    ##### Inner Closures
    
    The old, inner class style of Scala closures are compiled into separate 
inner classes, one per lambda body. So in order to discover the implementation 
(bytecode) of the inner closures, one has to jump over multiple classes. The 
name of such a class would contain the marker substring `$anonfun$`.
    
    The new, "indylambda" style Scala closures are compiled into **static 
methods** in the class where the lambdas were declared. So for lexically nested 
closures, their lambda bodies would all be compiled into static methods **in 
the same class**. This makes it much easier to discover the implementation 
(bytecode) of the nested lambda bodies. The name of such a static method would 
contain the marker substring `$anonfun$`.
    
    Discovery of inner closures involves scanning bytecode for certain patterns 
that represent the creation of a closure object for the inner closure.
    - For inner class style: the closure object creation site is like `new 
<InnerClassForTheClosure>(captured args)`
    - For "indylambda" style: the closure object creation site would be 
compiled into an `invokedynamic` instruction, with its "bootstrap method" 
pointing to the same one used by Java 8 for its serializable lambdas, and with 
the bootstrap method arguments pointing to the implementation method.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Before this PR, Spark 2.4 / 3.0 / master on Scala 2.12 would not 
support Scala closures declared in a Scala REPL that captures anything from the 
REPL line objects. After this PR, such scenario is supported.
    
    ### How was this patch tested?
    
    Added new unit test case to `org.apache.spark.repl.SingletonReplSuite`. The 
new test case fails without the fix in this PR, and pases with the fix.
    
    Closes #28463 from rednaxelafx/closure-cleaner-indylambda.
    
    Authored-by: Kris Mok <kris.mokdatabricks.com>
    Signed-off-by: Wenchen Fan <wenchendatabricks.com>
    (cherry picked from commit dc01b7556f74e4a9873ceb1f78bc7df4e2ab4a8a)
    Signed-off-by: Kris Mok <kris.mokdatabricks.com>
    
    Closes #28577 from rednaxelafx/backport-spark-31399-2.4.
    
    Authored-by: Kris Mok <[email protected]>
    Signed-off-by: DB Tsai <[email protected]>
---
 .../org/apache/spark/util/ClosureCleaner.scala     | 432 ++++++++++++++++++---
 .../apache/spark/util/ClosureCleanerSuite.scala    |   3 -
 .../apache/spark/util/ClosureCleanerSuite2.scala   |   4 +-
 .../org/apache/spark/repl/SingletonReplSuite.scala |  61 +++
 4 files changed, 443 insertions(+), 57 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala 
b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 43d6256..e8abbdf 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -18,13 +18,16 @@
 package org.apache.spark.util
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
-import java.lang.invoke.SerializedLambda
+import java.lang.invoke.{MethodHandleInfo, SerializedLambda}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{Map, Set, Stack}
 import scala.language.existentials
 
-import org.apache.xbean.asm6.{ClassReader, ClassVisitor, MethodVisitor, Type}
+import org.apache.commons.lang3.ClassUtils
+import org.apache.xbean.asm6.{ClassReader, ClassVisitor, Handle, 
MethodVisitor, Type}
 import org.apache.xbean.asm6.Opcodes._
+import org.apache.xbean.asm6.tree.{ClassNode, MethodNode}
 
 import org.apache.spark.{SparkEnv, SparkException}
 import org.apache.spark.internal.Logging
@@ -34,8 +37,6 @@ import org.apache.spark.internal.Logging
  */
 private[spark] object ClosureCleaner extends Logging {
 
-  private val isScala2_11 = 
scala.util.Properties.versionString.contains("2.11")
-
   // Get an ASM class reader for a given class from the JAR that loaded it
   private[util] def getClassReader(cls: Class[_]): ClassReader = {
     // Copy data over, before delegating to ClassReader - else we can run out 
of open file handles.
@@ -163,42 +164,6 @@ private[spark] object ClosureCleaner extends Logging {
   }
 
   /**
-   * Try to get a serialized Lambda from the closure.
-   *
-   * @param closure the closure to check.
-   */
-  private def getSerializedLambda(closure: AnyRef): Option[SerializedLambda] = 
{
-    if (isScala2_11) {
-      return None
-    }
-    val isClosureCandidate =
-      closure.getClass.isSynthetic &&
-        closure
-          .getClass
-          .getInterfaces.exists(_.getName == "scala.Serializable")
-
-    if (isClosureCandidate) {
-      try {
-        Option(inspect(closure))
-      } catch {
-        case e: Exception =>
-          // no need to check if debug is enabled here the Spark
-          // logging api covers this.
-          logDebug("Closure is not a serialized lambda.", e)
-          None
-      }
-    } else {
-      None
-    }
-  }
-
-  private def inspect(closure: AnyRef): SerializedLambda = {
-    val writeReplace = closure.getClass.getDeclaredMethod("writeReplace")
-    writeReplace.setAccessible(true)
-    
writeReplace.invoke(closure).asInstanceOf[java.lang.invoke.SerializedLambda]
-  }
-
-  /**
    * Helper method to clean the given closure in place.
    *
    * The mechanism is to traverse the hierarchy of enclosing closures and null 
out any
@@ -245,12 +210,12 @@ private[spark] object ClosureCleaner extends Logging {
       cleanTransitively: Boolean,
       accessedFields: Map[Class[_], Set[String]]): Unit = {
 
-    // most likely to be the case with 2.12, 2.13
+    // indylambda check. Most likely to be the case with 2.12, 2.13
     // so we check first
     // non LMF-closures should be less frequent from now on
-    val lambdaFunc = getSerializedLambda(func)
+    val maybeIndylambdaProxy = 
IndylambdaScalaClosures.getSerializationProxy(func)
 
-    if (!isClosure(func.getClass) && lambdaFunc.isEmpty) {
+    if (!isClosure(func.getClass) && maybeIndylambdaProxy.isEmpty) {
       logDebug(s"Expected a closure; got ${func.getClass.getName}")
       return
     }
@@ -262,7 +227,7 @@ private[spark] object ClosureCleaner extends Logging {
       return
     }
 
-    if (lambdaFunc.isEmpty) {
+    if (maybeIndylambdaProxy.isEmpty) {
       logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
 
       // A list of classes that represents closures enclosed in the given one
@@ -308,7 +273,7 @@ private[spark] object ClosureCleaner extends Logging {
         }
       }
 
-      logDebug(s" + fields accessed by starting closure: " + 
accessedFields.size)
+      logDebug(s" + fields accessed by starting closure: 
${accessedFields.size} classes")
       accessedFields.foreach { f => logDebug("     " + f) }
 
       // List of outer (class, object) pairs, ordered from outermost to 
innermost
@@ -377,16 +342,64 @@ private[spark] object ClosureCleaner extends Logging {
 
       logDebug(s" +++ closure $func (${func.getClass.getName}) is now cleaned 
+++")
     } else {
-      logDebug(s"Cleaning lambda: ${lambdaFunc.get.getImplMethodName}")
+      val lambdaProxy = maybeIndylambdaProxy.get
+      val implMethodName = lambdaProxy.getImplMethodName
+
+      logDebug(s"Cleaning indylambda closure: $implMethodName")
 
+      // capturing class is the class that declared this lambda
+      val capturingClassName = lambdaProxy.getCapturingClass.replace('/', '.')
+      val classLoader = func.getClass.getClassLoader // this is the safest 
option
       // scalastyle:off classforname
-      val captClass = 
Class.forName(lambdaFunc.get.getCapturingClass.replace('/', '.'),
-        false, Thread.currentThread.getContextClassLoader)
+      val capturingClass = Class.forName(capturingClassName, false, 
classLoader)
       // scalastyle:on classforname
+
       // Fail fast if we detect return statements in closures
-      getClassReader(captClass)
-        .accept(new 
ReturnStatementFinder(Some(lambdaFunc.get.getImplMethodName)), 0)
-      logDebug(s" +++ Lambda closure (${lambdaFunc.get.getImplMethodName}) is 
now cleaned +++")
+      val capturingClassReader = getClassReader(capturingClass)
+      capturingClassReader.accept(new 
ReturnStatementFinder(Option(implMethodName)), 0)
+
+      val isClosureDeclaredInScalaRepl = 
capturingClassName.startsWith("$line") &&
+        capturingClassName.endsWith("$iw")
+      val outerThisOpt = if (lambdaProxy.getCapturedArgCount > 0) {
+        Option(lambdaProxy.getCapturedArg(0))
+      } else {
+        None
+      }
+
+      // only need to clean when there is an enclosing "this" captured by the 
closure, and it
+      // should be something cleanable, i.e. a Scala REPL line object
+      val needsCleaning = isClosureDeclaredInScalaRepl &&
+        outerThisOpt.isDefined && outerThisOpt.get.getClass.getName == 
capturingClassName
+
+      if (needsCleaning) {
+        // indylambda closures do not reference enclosing closures via an 
`$outer` chain, so no
+        // transitive cleaning on the `$outer` chain is needed.
+        // Thus clean() shouldn't be recursively called with a non-empty 
accessedFields.
+        assert(accessedFields.isEmpty)
+
+        initAccessedFields(accessedFields, Seq(capturingClass))
+        IndylambdaScalaClosures.findAccessedFields(
+          lambdaProxy, classLoader, accessedFields, cleanTransitively)
+
+        logDebug(s" + fields accessed by starting closure: 
${accessedFields.size} classes")
+        accessedFields.foreach { f => logDebug("     " + f) }
+
+        if (accessedFields(capturingClass).size < 
capturingClass.getDeclaredFields.length) {
+          // clone and clean the enclosing `this` only when there are fields 
to null out
+
+          val outerThis = outerThisOpt.get
+
+          logDebug(s" + cloning instance of REPL class $capturingClassName")
+          val clonedOuterThis = cloneAndSetFields(
+            parent = null, outerThis, capturingClass, accessedFields)
+
+          val outerField = func.getClass.getDeclaredField("arg$1")
+          outerField.setAccessible(true)
+          outerField.set(func, clonedOuterThis)
+        }
+      }
+
+      logDebug(s" +++ indylambda closure ($implMethodName) is now cleaned +++")
     }
 
     if (checkSerializable) {
@@ -421,6 +434,321 @@ private[spark] object ClosureCleaner extends Logging {
   }
 }
 
+private[spark] object IndylambdaScalaClosures extends Logging {
+  private val isScala2_11 = 
scala.util.Properties.versionString.contains("2.11")
+
+  // internal name of java.lang.invoke.LambdaMetafactory
+  val LambdaMetafactoryClassName = "java/lang/invoke/LambdaMetafactory"
+  // the method that Scala indylambda use for bootstrap method
+  val LambdaMetafactoryMethodName = "altMetafactory"
+  val LambdaMetafactoryMethodDesc = "(Ljava/lang/invoke/MethodHandles$Lookup;" 
+
+    "Ljava/lang/String;Ljava/lang/invoke/MethodType;[Ljava/lang/Object;)" +
+    "Ljava/lang/invoke/CallSite;"
+
+  /**
+   * Check if the given reference is a indylambda style Scala closure.
+   * If so (e.g. for Scala 2.12+ closures), return a non-empty serialization 
proxy
+   * (SerializedLambda) of the closure;
+   * otherwise (e.g. for Scala 2.11 closures) return None.
+   *
+   * @param maybeClosure the closure to check.
+   */
+  def getSerializationProxy(maybeClosure: AnyRef): Option[SerializedLambda] = {
+    if (isScala2_11) {
+      // Keep existing behavior in Spark 2.4: assume Scala 2.11 doesn't use 
indylambda.
+      // NOTE: It's actually possible to turn on indylambda in Scala 2.11 via 
delambdafy:inline,
+      //       but that's not the default and we don't expect it to be in use.
+      return None
+    }
+
+    def isClosureCandidate(cls: Class[_]): Boolean = {
+      // TODO: maybe lift this restriction to support other functional 
interfaces in the future
+      val implementedInterfaces = ClassUtils.getAllInterfaces(cls).asScala
+      implementedInterfaces.exists(_.getName.startsWith("scala.Function"))
+    }
+
+    maybeClosure.getClass match {
+      // shortcut the fast check:
+      // 1. indylambda closure classes are generated by Java's 
LambdaMetafactory, and they're
+      //    always synthetic.
+      // 2. We only care about Serializable closures, so let's check that as 
well
+      case c if !c.isSynthetic || !maybeClosure.isInstanceOf[Serializable] => 
None
+
+      case c if isClosureCandidate(c) =>
+        try {
+          Option(inspect(maybeClosure)).filter(isIndylambdaScalaClosure)
+        } catch {
+          case e: Exception =>
+            logDebug("The given reference is not an indylambda Scala 
closure.", e)
+            None
+        }
+
+      case _ => None
+    }
+  }
+
+  def isIndylambdaScalaClosure(lambdaProxy: SerializedLambda): Boolean = {
+    lambdaProxy.getImplMethodKind == MethodHandleInfo.REF_invokeStatic &&
+      lambdaProxy.getImplMethodName.contains("$anonfun$")
+  }
+
+  def inspect(closure: AnyRef): SerializedLambda = {
+    val writeReplace = closure.getClass.getDeclaredMethod("writeReplace")
+    writeReplace.setAccessible(true)
+    writeReplace.invoke(closure).asInstanceOf[SerializedLambda]
+  }
+
+  /**
+   * Check if the handle represents the LambdaMetafactory that indylambda 
Scala closures
+   * use for creating the lambda class and getting a closure instance.
+   */
+  def isLambdaMetafactory(bsmHandle: Handle): Boolean = {
+    bsmHandle.getOwner == LambdaMetafactoryClassName &&
+      bsmHandle.getName == LambdaMetafactoryMethodName &&
+      bsmHandle.getDesc == LambdaMetafactoryMethodDesc
+  }
+
+  /**
+   * Check if the handle represents a target method that is:
+   * - a STATIC method that implements a Scala lambda body in the indylambda 
style
+   * - captures the enclosing `this`, i.e. the first argument is a reference 
to the same type as
+   *   the owning class.
+   * Returns true if both criteria above are met.
+   */
+  def isLambdaBodyCapturingOuter(handle: Handle, ownerInternalName: String): 
Boolean = {
+    handle.getTag == H_INVOKESTATIC &&
+      handle.getName.contains("$anonfun$") &&
+      handle.getOwner == ownerInternalName &&
+      handle.getDesc.startsWith(s"(L$ownerInternalName;")
+  }
+
+  /**
+   * Check if the callee of a call site is a inner class constructor.
+   * - A constructor has to be invoked via INVOKESPECIAL
+   * - A constructor's internal name is "&lt;init&gt;" and the return type is 
"V" (void)
+   * - An inner class' first argument in the signature has to be a reference 
to the
+   *   enclosing "this", aka `$outer` in Scala.
+   */
+  def isInnerClassCtorCapturingOuter(
+      op: Int, name: String, desc: String, callerInternalName: String): 
Boolean = {
+    op == INVOKESPECIAL && name == "<init>" && 
desc.startsWith(s"(L$callerInternalName;")
+  }
+
+  /**
+   * Scans an indylambda Scala closure, along with its lexically nested 
closures, and populate
+   * the accessed fields info on which fields on the outer object are accessed.
+   *
+   * This is equivalent to getInnerClosureClasses() + InnerClosureFinder + 
FieldAccessFinder fused
+   * into one for processing indylambda closures. The traversal order along 
the call graph is the
+   * same for all three combined, so they can be fused together easily while 
maintaining the same
+   * ordering as the existing implementation.
+   *
+   * Precondition: this function expects the `accessedFields` to be populated 
with all known
+   *               outer classes and their super classes to be in the map as 
keys, e.g.
+   *               initializing via ClosureCleaner.initAccessedFields.
+   */
+  // scalastyle:off line.size.limit
+  // Example: run the following code snippet in a Spark Shell w/ Scala 2.12+:
+  //   val topLevelValue = "someValue"; val closure = (j: Int) => {
+  //     class InnerFoo {
+  //       val innerClosure = (x: Int) => (1 to x).map { y => y + 
topLevelValue }
+  //     }
+  //     val innerFoo = new InnerFoo
+  //     (1 to j).flatMap(innerFoo.innerClosure)
+  //   }
+  //   sc.parallelize(0 to 2).map(closure).collect
+  //
+  // produces the following trace-level logs:
+  // (slightly simplified:
+  //   - omitting the "ignoring ..." lines;
+  //   - "$iw" is actually "$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw";
+  //   - "invokedynamic" lines are simplified to just show the name+desc, 
omitting the bsm info)
+  //   Cleaning indylambda closure: $anonfun$closure$1$adapted
+  //     scanning 
$iw.$anonfun$closure$1$adapted(L$iw;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;
+  //       found intra class call to 
$iw.$anonfun$closure$1(L$iw;I)Lscala/collection/immutable/IndexedSeq;
+  //     scanning 
$iw.$anonfun$closure$1(L$iw;I)Lscala/collection/immutable/IndexedSeq;
+  //       found inner class $iw$InnerFoo$1
+  //         found method innerClosure()Lscala/Function1;
+  //         found method 
$anonfun$innerClosure$2(L$iw$InnerFoo$1;I)Ljava/lang/String;
+  //         found method 
$anonfun$innerClosure$1(L$iw$InnerFoo$1;I)Lscala/collection/immutable/IndexedSeq;
+  //         found method <init>(L$iw;)V
+  //         found method 
$anonfun$innerClosure$2$adapted(L$iw$InnerFoo$1;Ljava/lang/Object;)Ljava/lang/String;
+  //         found method 
$anonfun$innerClosure$1$adapted(L$iw$InnerFoo$1;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;
+  //         found method 
$deserializeLambda$(Ljava/lang/invoke/SerializedLambda;)Ljava/lang/Object;
+  //       found call to outer $iw$InnerFoo$1.innerClosure()Lscala/Function1;
+  //     scanning $iw$InnerFoo$1.innerClosure()Lscala/Function1;
+  //     scanning 
$iw$InnerFoo$1.$deserializeLambda$(Ljava/lang/invoke/SerializedLambda;)Ljava/lang/Object;
+  //       invokedynamic: 
lambdaDeserialize(Ljava/lang/invoke/SerializedLambda;)Ljava/lang/Object;, 
bsm...)
+  //     scanning 
$iw$InnerFoo$1.$anonfun$innerClosure$1$adapted(L$iw$InnerFoo$1;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;
+  //       found intra class call to 
$iw$InnerFoo$1.$anonfun$innerClosure$1(L$iw$InnerFoo$1;I)Lscala/collection/immutable/IndexedSeq;
+  //     scanning 
$iw$InnerFoo$1.$anonfun$innerClosure$1(L$iw$InnerFoo$1;I)Lscala/collection/immutable/IndexedSeq;
+  //       invokedynamic: apply(L$iw$InnerFoo$1;)Lscala/Function1;, bsm...)
+  //       found inner closure 
$iw$InnerFoo$1.$anonfun$innerClosure$2$adapted(L$iw$InnerFoo$1;Ljava/lang/Object;)Ljava/lang/String;
 (6)
+  //     scanning 
$iw$InnerFoo$1.$anonfun$innerClosure$2$adapted(L$iw$InnerFoo$1;Ljava/lang/Object;)Ljava/lang/String;
+  //       found intra class call to 
$iw$InnerFoo$1.$anonfun$innerClosure$2(L$iw$InnerFoo$1;I)Ljava/lang/String;
+  //     scanning 
$iw$InnerFoo$1.$anonfun$innerClosure$2(L$iw$InnerFoo$1;I)Ljava/lang/String;
+  //       found call to outer $iw.topLevelValue()Ljava/lang/String;
+  //     scanning $iw.topLevelValue()Ljava/lang/String;
+  //       found field access topLevelValue on $iw
+  //     scanning 
$iw$InnerFoo$1.$anonfun$innerClosure$2$adapted(L$iw$InnerFoo$1;Ljava/lang/Object;)Ljava/lang/String;
+  //       found intra class call to 
$iw$InnerFoo$1.$anonfun$innerClosure$2(L$iw$InnerFoo$1;I)Ljava/lang/String;
+  //     scanning $iw$InnerFoo$1.<init>(L$iw;)V
+  //       invokedynamic: apply(L$iw$InnerFoo$1;)Lscala/Function1;, bsm...)
+  //       found inner closure 
$iw$InnerFoo$1.$anonfun$innerClosure$1$adapted(L$iw$InnerFoo$1;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;
 (6)
+  //     scanning 
$iw$InnerFoo$1.$anonfun$innerClosure$1(L$iw$InnerFoo$1;I)Lscala/collection/immutable/IndexedSeq;
+  //       invokedynamic: apply(L$iw$InnerFoo$1;)Lscala/Function1;, bsm...)
+  //       found inner closure 
$iw$InnerFoo$1.$anonfun$innerClosure$2$adapted(L$iw$InnerFoo$1;Ljava/lang/Object;)Ljava/lang/String;
 (6)
+  //     scanning 
$iw$InnerFoo$1.$anonfun$innerClosure$2(L$iw$InnerFoo$1;I)Ljava/lang/String;
+  //       found call to outer $iw.topLevelValue()Ljava/lang/String;
+  //     scanning $iw$InnerFoo$1.innerClosure()Lscala/Function1;
+  //    + fields accessed by starting closure: 2 classes
+  //        (class java.lang.Object,Set())
+  //        (class $iw,Set(topLevelValue))
+  //    + cloning instance of REPL class $iw
+  //    +++ indylambda closure ($anonfun$closure$1$adapted) is now cleaned +++
+  //
+  // scalastyle:on line.size.limit
+  def findAccessedFields(
+      lambdaProxy: SerializedLambda,
+      lambdaClassLoader: ClassLoader,
+      accessedFields: Map[Class[_], Set[String]],
+      findTransitively: Boolean): Unit = {
+
+    // We may need to visit the same class multiple times for different 
methods on it, and we'll
+    // need to lookup by name. So we use ASM's Tree API and cache the 
ClassNode/MethodNode.
+    val classInfoByInternalName = Map.empty[String, (Class[_], ClassNode)]
+    val methodNodeById = Map.empty[MethodIdentifier[_], MethodNode]
+    def getOrUpdateClassInfo(classInternalName: String): (Class[_], ClassNode) 
= {
+      val classInfo = 
classInfoByInternalName.getOrElseUpdate(classInternalName, {
+        val classExternalName = classInternalName.replace('/', '.')
+        // scalastyle:off classforname
+        val clazz = Class.forName(classExternalName, false, lambdaClassLoader)
+        // scalastyle:on classforname
+        val classNode = new ClassNode()
+        val classReader = ClosureCleaner.getClassReader(clazz)
+        classReader.accept(classNode, 0)
+
+        for (m <- classNode.methods.asScala) {
+          methodNodeById(MethodIdentifier(clazz, m.name, m.desc)) = m
+        }
+
+        (clazz, classNode)
+      })
+      classInfo
+    }
+
+    val implClassInternalName = lambdaProxy.getImplClass
+    val (implClass, _) = getOrUpdateClassInfo(implClassInternalName)
+
+    val implMethodId = MethodIdentifier(
+      implClass, lambdaProxy.getImplMethodName, 
lambdaProxy.getImplMethodSignature)
+
+    // The set internal names of classes that we would consider following the 
calls into.
+    // Candidates are: known outer class which happens to be the starting 
closure's impl class,
+    // and all inner classes discovered below.
+    // Note that code in an inner class can make calls to methods in any of 
its enclosing classes,
+    // e.g.
+    //   starting closure (in class T)
+    //     inner class A
+    //        inner class B
+    //          inner closure
+    // we need to track calls from "inner closure" to outer classes relative 
to it (class T, A, B)
+    // to better find and track field accesses.
+    val trackedClassInternalNames = Set[String](implClassInternalName)
+
+    // Depth-first search for inner closures and track the fields that were 
accessed in them.
+    // Start from the lambda body's implementation method, follow method 
invocations
+    val visited = Set.empty[MethodIdentifier[_]]
+    val stack = Stack[MethodIdentifier[_]](implMethodId)
+    def pushIfNotVisited(methodId: MethodIdentifier[_]): Unit = {
+      if (!visited.contains(methodId)) {
+        stack.push(methodId)
+      }
+    }
+
+    while (!stack.isEmpty) {
+      val currentId = stack.pop
+      visited += currentId
+
+      val currentClass = currentId.cls
+      val currentMethodNode = methodNodeById(currentId)
+      logTrace(s"  scanning 
${currentId.cls.getName}.${currentId.name}${currentId.desc}")
+      currentMethodNode.accept(new MethodVisitor(ASM6) {
+        val currentClassName = currentClass.getName
+        val currentClassInternalName = currentClassName.replace('.', '/')
+
+        // Find and update the accessedFields info. Only fields on known outer 
classes are tracked.
+        // This is the FieldAccessFinder equivalent.
+        override def visitFieldInsn(op: Int, owner: String, name: String, 
desc: String): Unit = {
+          if (op == GETFIELD || op == PUTFIELD) {
+            val ownerExternalName = owner.replace('/', '.')
+            for (cl <- accessedFields.keys if cl.getName == ownerExternalName) 
{
+              logTrace(s"    found field access $name on $ownerExternalName")
+              accessedFields(cl) += name
+            }
+          }
+        }
+
+        override def visitMethodInsn(
+            op: Int, owner: String, name: String, desc: String, itf: Boolean): 
Unit = {
+          val ownerExternalName = owner.replace('/', '.')
+          if (owner == currentClassInternalName) {
+            logTrace(s"    found intra class call to 
$ownerExternalName.$name$desc")
+            // could be invoking a helper method or a field accessor method, 
just follow it.
+            pushIfNotVisited(MethodIdentifier(currentClass, name, desc))
+          } else if (isInnerClassCtorCapturingOuter(op, name, desc, 
currentClassInternalName)) {
+            // Discover inner classes.
+            // This this the InnerClassFinder equivalent for inner classes, 
which still use the
+            // `$outer` chain. So this is NOT controlled by the 
`findTransitively` flag.
+            logDebug(s"    found inner class $ownerExternalName")
+            // val innerClassInfo = getOrUpdateClassInfo(owner)
+            val (innerClass, innerClassNode) = getOrUpdateClassInfo(owner)
+            // val innerClass = innerClassInfo._1
+            // val innerClassNode = innerClassInfo._2
+            trackedClassInternalNames += owner
+            // We need to visit all methods on the inner class so that we 
don't missing anything.
+            for (m <- innerClassNode.methods.asScala) {
+              logTrace(s"      found method ${m.name}${m.desc}")
+              pushIfNotVisited(MethodIdentifier(innerClass, m.name, m.desc))
+            }
+          } else if (findTransitively && 
trackedClassInternalNames.contains(owner)) {
+            logTrace(s"    found call to outer $ownerExternalName.$name$desc")
+            val (calleeClass, _) = getOrUpdateClassInfo(owner) // make sure 
MethodNodes are cached
+            pushIfNotVisited(MethodIdentifier(calleeClass, name, desc))
+          } else {
+            // keep the same behavior as the original ClosureCleaner
+            logTrace(s"    ignoring call to $ownerExternalName.$name$desc")
+          }
+        }
+
+        // Find the lexically nested closures
+        // This is the InnerClosureFinder equivalent for indylambda nested 
closures
+        override def visitInvokeDynamicInsn(
+            name: String, desc: String, bsmHandle: Handle, bsmArgs: Object*): 
Unit = {
+          logTrace(s"    invokedynamic: $name$desc, bsmHandle=$bsmHandle, 
bsmArgs=$bsmArgs")
+
+          // fast check: we only care about Scala lambda creation
+          // TODO: maybe lift this restriction and support other functional 
interfaces
+          if (!name.startsWith("apply")) return
+          if 
(!Type.getReturnType(desc).getDescriptor.startsWith("Lscala/Function")) return
+
+          if (isLambdaMetafactory(bsmHandle)) {
+            // OK we're in the right bootstrap method for serializable Java 8 
style lambda creation
+            val targetHandle = bsmArgs(1).asInstanceOf[Handle]
+            if (isLambdaBodyCapturingOuter(targetHandle, 
currentClassInternalName)) {
+              // this is a lexically nested closure that also captures the 
enclosing `this`
+              logDebug(s"    found inner closure $targetHandle")
+              val calleeMethodId =
+                MethodIdentifier(currentClass, targetHandle.getName, 
targetHandle.getDesc)
+              pushIfNotVisited(calleeMethodId)
+            }
+          }
+        }
+      })
+    }
+  }
+}
+
 private[spark] class ReturnStatementInClosureException
   extends SparkException("Return statements aren't allowed in Spark closures")
 
@@ -429,7 +757,7 @@ private class ReturnStatementFinder(targetMethodName: 
Option[String] = None)
   override def visitMethod(access: Int, name: String, desc: String,
       sig: String, exceptions: Array[String]): MethodVisitor = {
 
-    // $anonfun$ covers Java 8 lambdas
+    // $anonfun$ covers indylambda closures
     if (name.contains("apply") || name.contains("$anonfun$")) {
       // A method with suffix "$adapted" will be generated in cases like
       // { _:Int => return; Seq()} but not { _:Int => return; true}
diff --git 
a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index 3c66608..ebe39ab 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -123,7 +123,6 @@ class ClosureCleanerSuite extends SparkFunSuite {
   }
 
   test("SPARK-22328: ClosureCleaner misses referenced superclass fields: case 
1") {
-    assume(!ClosureCleanerSuite2.supportsLMFs)
     val concreteObject = new TestAbstractClass {
       val n2 = 222
       val s2 = "bbb"
@@ -144,7 +143,6 @@ class ClosureCleanerSuite extends SparkFunSuite {
   }
 
   test("SPARK-22328: ClosureCleaner misses referenced superclass fields: case 
2") {
-    assume(!ClosureCleanerSuite2.supportsLMFs)
     val concreteObject = new TestAbstractClass2 {
       val n2 = 222
       val s2 = "bbb"
@@ -158,7 +156,6 @@ class ClosureCleanerSuite extends SparkFunSuite {
   }
 
   test("SPARK-22328: multiple outer classes have the same parent class") {
-    assume(!ClosureCleanerSuite2.supportsLMFs)
     val concreteObject = new TestAbstractClass2 {
 
       val innerObject = new TestAbstractClass2 {
diff --git 
a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala 
b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
index 96da8ec..214b41e5 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
@@ -575,7 +575,7 @@ class ClosureCleanerSuite2 extends SparkFunSuite with 
BeforeAndAfterAll with Pri
     test6()()()
   }
 
-  test("verify nested non-LMF closures") {
+  test("verify nested LMF closures") {
     assume(ClosureCleanerSuite2.supportsLMFs)
     class A1(val f: Int => Int)
     class A2(val f: Int => Int => Int)
@@ -594,6 +594,6 @@ object ClosureCleanerSuite2 {
   // Scala 2.12 allows better interop with Java 8 via lambda syntax. This is 
supported
   // by implementing FunctionN classes in Scala’s standard library as Single 
Abstract
   // Method (SAM) types. Lambdas are implemented via the invokedynamic 
instruction and
-  // the use of the LambdaMwtaFactory (LMF) machanism.
+  // the use of the LambdaMetaFactory (LMF) machanism; aka "indylambda".
   val supportsLMFs = scala.util.Properties.versionString.contains("2.12")
 }
diff --git a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala 
b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
index d49e0fd..659e73e 100644
--- a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
@@ -395,6 +395,67 @@ class SingletonReplSuite extends SparkFunSuite {
     assertDoesNotContain("Exception", output)
   }
 
+  test("SPARK-31399: should clone+clean line object w/ non-serializable state 
in ClosureCleaner") {
+    // Test ClosureCleaner when a closure captures the enclosing `this` REPL 
line object, and that
+    // object contains an unused non-serializable field.
+    // Specifically, the closure in this test case contains a directly nested 
closure, and the
+    // capture is triggered by the inner closure.
+    // `ns` should be nulled out, but `topLevelValue` should stay intact.
+
+    // Can't use :paste mode because PipedOutputStream/PipedInputStream 
doesn't work well with the
+    // EOT control character (i.e. Ctrl+D).
+    // Just write things on a single line to emulate :paste mode.
+
+    // NOTE: in order for this test case to trigger the intended scenario, the 
following three
+    //       variables need to be in the same "input", which will make the 
REPL pack them into the
+    //       same REPL line object:
+    //         - ns: a non-serializable state, not accessed by the closure;
+    //         - topLevelValue: a serializable state, accessed by the closure;
+    //         - closure: the starting closure, captures the enclosing REPL 
line object.
+    val output = runInterpreter(
+      """
+        |class NotSerializableClass(val x: Int)
+        |val ns = new NotSerializableClass(42); val topLevelValue = 
"someValue"; val closure =
+        |(j: Int) => {
+        |  (1 to j).flatMap { x =>
+        |    (1 to x).map { y => y + topLevelValue }
+        |  }
+        |}
+        |val r = sc.parallelize(0 to 2).map(closure).collect
+      """.stripMargin)
+    assertContains("r: Array[scala.collection.immutable.IndexedSeq[String]] = 
" +
+      "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 
2someValue))", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("SPARK-31399: ClosureCleaner should discover indirectly nested closure 
in inner class") {
+    // Similar to the previous test case, but with indirect closure nesting 
instead.
+    // There's still nested closures involved, but the inner closure is 
indirectly nested in the
+    // outer closure, with a level of inner class in between them.
+    // This changes how the inner closure references/captures the outer 
closure/enclosing `this`
+    // REPL line object, and covers a different code path in inner closure 
discovery.
+
+    // `ns` should be nulled out, but `topLevelValue` should stay intact.
+
+    val output = runInterpreter(
+      """
+        |class NotSerializableClass(val x: Int)
+        |val ns = new NotSerializableClass(42); val topLevelValue = 
"someValue"; val closure =
+        |(j: Int) => {
+        |  class InnerFoo {
+        |    val innerClosure = (x: Int) => (1 to x).map { y => y + 
topLevelValue }
+        |  }
+        |  val innerFoo = new InnerFoo
+        |  (1 to j).flatMap(innerFoo.innerClosure)
+        |}
+        |val r = sc.parallelize(0 to 2).map(closure).collect
+      """.stripMargin)
+    assertContains("r: Array[scala.collection.immutable.IndexedSeq[String]] = 
" +
+       "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 
2someValue))", output)
+    assertDoesNotContain("Array(Vector(), Vector(1null), Vector(1null, 1null, 
2null)", output)
+    assertDoesNotContain("Exception", output)
+  }
+
   test("newProductSeqEncoder with REPL defined class") {
     val output = runInterpreter(
       """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to