This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a09b5c945235 [SPARK-53226][SPARK-52088][CORE] Make ClosureCleaner work 
with Java22+
a09b5c945235 is described below

commit a09b5c94523544e6b419808b86b3d966b2e97c74
Author: Kousuke Saruta <[email protected]>
AuthorDate: Tue Mar 3 08:11:52 2026 +0800

    [SPARK-53226][SPARK-52088][CORE] Make ClosureCleaner work with Java22+
    
    ### What changes were proposed in this pull request?
    This PR proposes to change `ClosureClenaer` to work with Java 22+.
    Current `ClosureCleaner` doesn't work with Java 22. For example, the 
following code fails.
    ```
    val x = 100
    sc.parallelize(1 to 10).map(v => v + x).collect
    
    java.lang.InternalError: java.lang.IllegalAccessException: final field has 
no write access: $Lambda/0x00001c0001bae838.arg$1/putField, from class 
java.lang.Object (module java.base)
      at 
java.base/jdk.internal.reflect.MethodHandleAccessorFactory.newFieldAccessor(MethodHandleAccessorFactory.java:207)
      at 
java.base/jdk.internal.reflect.ReflectionFactory.newFieldAccessor(ReflectionFactory.java:144)
      at 
java.base/java.lang.reflect.Field.acquireOverrideFieldAccessor(Field.java:1200)
      at 
java.base/java.lang.reflect.Field.getOverrideFieldAccessor(Field.java:1169)
      at java.base/java.lang.reflect.Field.set(Field.java:836)
      at 
org.apache.spark.util.ClosureCleaner$.setFieldAndIgnoreModifiers(ClosureCleaner.scala:563)
      at 
org.apache.spark.util.ClosureCleaner$.cleanupScalaReplClosure(ClosureCleaner.scala:431)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:256)
      at 
org.apache.spark.util.SparkClosureCleaner$.clean(SparkClosureCleaner.scala:39)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2844)
      at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:425)
      at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)
      at org.apache.spark.rdd.RDD.map(RDD.scala:424)
      ... 38 elided
    Caused by: java.lang.IllegalAccessException: final field has no write 
access: $Lambda/0x00001c0001bae838.arg$1/putField, from class java.lang.Object 
(module java.base)
      at 
java.base/java.lang.invoke.MemberName.makeAccessException(MemberName.java:889)
      at 
java.base/java.lang.invoke.MethodHandles$Lookup.unreflectField(MethodHandles.java:3609)
      at 
java.base/java.lang.invoke.MethodHandles$Lookup.unreflectSetter(MethodHandles.java:3600)
      at 
java.base/java.lang.invoke.MethodHandleImpl$1.unreflectField(MethodHandleImpl.java:1619)
      at 
java.base/jdk.internal.reflect.MethodHandleAccessorFactory.newFieldAccessor(MethodHandleAccessorFactory.java:185)
      ... 52 more
    ```
    
    The reason is that as of Java 22, final fields cannot be modified even if 
using reflection by [JEP416](https://openjdk.org/jeps/416).
    The current `ClosureCleaner` tries to modify a final field `arg$1` with a 
cloned and cleaned object so this part will fail.
    
    At first I considered two solutions:
    
    1. Using Unsafe API
    2. Using `--enable-final-field-mutation` option which is expected to be 
introduced by [JEP 500](https://openjdk.org/jeps/500)
    
    But either of them cannot resolve the issue because final fields of hidden 
classes cannot be modified and lambdas created by JVM internally using 
`invokedynamic` instruction are hidden classes (let's call such lambda indy 
lambda).
    
    So the PR resolves this issue by cloning indy lambdas with cleaned `arg$1` 
using `LambdaMetaFactory` with the impl method from the original lambdas.
    
    ### Why are the changes needed?
    To make Spark work with Java 22+.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Tested on my customized GA job with Java 22.
    https://github.com/sarutak/spark/actions/runs/19331525569
    
    All the failed tests are related to `datasketch`, which is a [separate 
issue](https://issues.apache.org/jira/browse/SPARK-53327).
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #52956 from sarutak/spark-shell-java22.
    
    Authored-by: Kousuke Saruta <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../org/apache/spark/util/ClosureCleaner.scala     | 163 +++++++++++++++++----
 .../main/scala/org/apache/spark/SparkContext.scala |   1 -
 .../apache/spark/util/SparkClosureCleaner.scala    |  16 +-
 .../apache/spark/util/ClosureCleanerSuite2.scala   |   5 +-
 .../apache/spark/sql/connect/UdfToProtoUtils.scala |   7 +-
 .../org/apache/spark/streaming/StateSpec.scala     |  10 +-
 6 files changed, 159 insertions(+), 43 deletions(-)

diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala 
b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 4f4233e96e8c..b23d13c32a10 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -18,10 +18,10 @@
 package org.apache.spark.util
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
-import java.lang.invoke.{MethodHandleInfo, SerializedLambda}
+import java.lang.invoke.{LambdaMetafactory, MethodHandle, MethodHandleInfo, 
MethodHandles, MethodType, SerializedLambda}
 import java.lang.reflect.{Field, Modifier}
 
-import scala.collection.mutable.{Map, Queue, Set, Stack}
+import scala.collection.mutable.{ArrayBuffer, Map, Queue, Set, Stack}
 import scala.jdk.CollectionConverters._
 
 import org.apache.xbean.asm9.{ClassReader, ClassVisitor, Handle, 
MethodVisitor, Opcodes, Type}
@@ -189,11 +189,18 @@ private[spark] object ClosureCleaner extends Logging {
    * @param cleanTransitively whether to clean enclosing closures transitively
    * @param accessedFields    a map from a class to a set of its fields that 
are accessed by
    *                          the starting closure
+   * @return Some(cleaned closure) if the closure was cleaned, None otherwise.
+   *         On the clone-based path (Java 22+ or when explicitly enabled), 
the returned
+   *         closure is a new lambda instance with cleaned outer references.
+   *         On the legacy mutation path, the closure is mutated in place and
+   *         Some with the same reference is returned.
    */
-  private[spark] def clean(
-      func: AnyRef,
+  private[spark] def clean[F <: AnyRef](
+      func: F,
       cleanTransitively: Boolean,
-      accessedFields: Map[Class[_], Set[String]]): Boolean = {
+      accessedFields: Map[Class[_], Set[String]]): Option[F] = {
+    var cleanedFunc: F = func
+
     // indylambda check. Most likely to be the case with 2.12, 2.13
     // so we check first
     // non LMF-closures should be less frequent from now on
@@ -201,14 +208,14 @@ private[spark] object ClosureCleaner extends Logging {
 
     if (!isClosure(func.getClass) && maybeIndylambdaProxy.isEmpty) {
       logDebug(s"Expected a closure; got ${func.getClass.getName}")
-      return false
+      return None
     }
 
     // TODO: clean all inner closures first. This requires us to find the 
inner objects.
     // TODO: cache outerClasses / innerClasses / accessedFields
 
     if (func == null) {
-      return false
+      return None
     }
 
     if (maybeIndylambdaProxy.isEmpty) {
@@ -232,9 +239,9 @@ private[spark] object ClosureCleaner extends Logging {
 
       val outerThis = if (lambdaProxy.getCapturedArgCount > 0) {
         // only need to clean when there is an enclosing non-null "this" 
captured by the closure
-        Option(lambdaProxy.getCapturedArg(0)).getOrElse(return false)
+        Option(lambdaProxy.getCapturedArg(0)).getOrElse(return None)
       } else {
-        return false
+        return None
       }
 
       // clean only if enclosing "this" is something cleanable, i.e. a Scala 
REPL line object or
@@ -244,22 +251,28 @@ private[spark] object ClosureCleaner extends Logging {
       if (isDefinedInAmmonite(outerThis.getClass)) {
         // If outerThis is a lambda, we have to clean that instead
         IndylambdaScalaClosures.getSerializationProxy(outerThis).foreach { _ =>
-          return clean(outerThis, cleanTransitively, accessedFields)
+          val cleanedOuterThis = clean(outerThis, cleanTransitively, 
accessedFields)
+          if (cleanedOuterThis.isEmpty) {
+            return None
+          } else {
+            return Some(
+              cloneIndyLambda(func, cleanedOuterThis.get, 
lambdaProxy).getOrElse(func))
+          }
         }
-        cleanupAmmoniteReplClosure(func, lambdaProxy, outerThis, 
cleanTransitively)
+        cleanedFunc = cleanupAmmoniteReplClosure(func, lambdaProxy, outerThis, 
cleanTransitively)
       } else {
         val isClosureDeclaredInScalaRepl = 
capturingClassName.startsWith("$line") &&
           capturingClassName.endsWith("$iw")
         if (isClosureDeclaredInScalaRepl && outerThis.getClass.getName == 
capturingClassName) {
           assert(accessedFields.isEmpty)
-          cleanupScalaReplClosure(func, lambdaProxy, outerThis, 
cleanTransitively)
+          cleanedFunc = cleanupScalaReplClosure(func, lambdaProxy, outerThis, 
cleanTransitively)
         }
       }
 
       logDebug(s" +++ indylambda closure ($implMethodName) is now cleaned +++")
     }
 
-    true
+    Some(cleanedFunc)
   }
 
   /**
@@ -395,11 +408,11 @@ private[spark] object ClosureCleaner extends Logging {
    * @param outerThis lambda enclosing class
    * @param cleanTransitively whether to clean enclosing closures transitively
    */
-  private def cleanupScalaReplClosure(
-      func: AnyRef,
+  private def cleanupScalaReplClosure[F <: AnyRef](
+      func: F,
       lambdaProxy: SerializedLambda,
       outerThis: AnyRef,
-      cleanTransitively: Boolean): Unit = {
+      cleanTransitively: Boolean): F = {
 
     val capturingClass = outerThis.getClass
     val accessedFields: Map[Class[_], Set[String]] = Map.empty
@@ -421,13 +434,17 @@ private[spark] object ClosureCleaner extends Logging {
       logDebug(s" + cloning instance of REPL class ${capturingClass.getName}")
       val clonedOuterThis = cloneAndSetFields(
         parent = null, outerThis, capturingClass, accessedFields)
-
-      val outerField = func.getClass.getDeclaredField("arg$1")
-      // SPARK-37072: When Java 17 is used and `outerField` is read-only,
-      // the content of `outerField` cannot be set by reflect api directly.
-      // But we can remove the `final` modifier of `outerField` before set 
value
-      // and reset the modifier after set value.
-      setFieldAndIgnoreModifiers(func, outerField, clonedOuterThis)
+      cloneIndyLambda(func, clonedOuterThis, lambdaProxy).getOrElse {
+        val outerField = func.getClass.getDeclaredField("arg$1")
+        // SPARK-37072: When Java 17 is used and `outerField` is read-only,
+        // the content of `outerField` cannot be set by reflect api directly.
+        // But we can remove the `final` modifier of `outerField` before set 
value
+        // and reset the modifier after set value.
+        setFieldAndIgnoreModifiers(func, outerField, clonedOuterThis)
+        func
+      }
+    } else {
+      func
     }
   }
 
@@ -456,11 +473,11 @@ private[spark] object ClosureCleaner extends Logging {
    * @param outerThis         lambda enclosing class
    * @param cleanTransitively whether to clean enclosing closures transitively
    */
-  private def cleanupAmmoniteReplClosure(
-      func: AnyRef,
+  private def cleanupAmmoniteReplClosure[F <: AnyRef](
+      func: F,
       lambdaProxy: SerializedLambda,
       outerThis: AnyRef,
-      cleanTransitively: Boolean): Unit = {
+      cleanTransitively: Boolean): F = {
 
     val accessedFields: Map[Class[_], Set[String]] = Map.empty
     initAccessedFields(accessedFields, Seq(outerThis.getClass))
@@ -549,9 +566,12 @@ private[spark] object ClosureCleaner extends Logging {
       cmdClones(outerThis.getClass)
     }
 
-    val outerField = func.getClass.getDeclaredField("arg$1")
-    // update lambda capturing class reference
-    setFieldAndIgnoreModifiers(func, outerField, outerThisClone)
+    cloneIndyLambda(func, outerThisClone, lambdaProxy).getOrElse {
+      val outerField = func.getClass.getDeclaredField("arg$1")
+      // update lambda capturing class reference
+      setFieldAndIgnoreModifiers(func, outerField, outerThisClone)
+      func
+    }
   }
 
   private def setFieldAndIgnoreModifiers(obj: AnyRef, field: Field, value: 
AnyRef): Unit = {
@@ -595,6 +615,91 @@ private[spark] object ClosureCleaner extends Logging {
     }
     obj
   }
+
+  private def cloneIndyLambda[F <: AnyRef](
+      indyLambda: F,
+      outerThis: AnyRef,
+      lambdaProxy: SerializedLambda): Option[F] = {
+    val javaVersion = Runtime.version().feature()
+    val useClone = 
System.getProperty("spark.cloneBasedClosureCleaner.enabled") == "true" ||
+      System.getenv("SPARK_CLONE_BASED_CLOSURE_CLEANER") == "1" || javaVersion 
>= 22
+
+    if (useClone) {
+      val factory = makeClonedIndyLambdaFactory(indyLambda.getClass, 
lambdaProxy)
+
+      val argsBuffer = new ArrayBuffer[Object]()
+      var i = 0
+      while (i < lambdaProxy.getCapturedArgCount) {
+        val arg = lambdaProxy.getCapturedArg(i)
+        argsBuffer.append(arg)
+        i += 1
+      }
+      val clonedLambda =
+        factory.invokeWithArguments(outerThis +: argsBuffer.tail.toArray: 
_*).asInstanceOf[F]
+      Some(clonedLambda)
+    } else {
+      None
+    }
+  }
+
+  private def makeClonedIndyLambdaFactory(
+      originalFuncClass: Class[_],
+      lambdaProxy: SerializedLambda): MethodHandle = {
+    val classLoader = originalFuncClass.getClassLoader
+
+    // scalastyle:off classforname
+    val fInterface = Class.forName(
+      lambdaProxy.getFunctionalInterfaceClass.replace("/", "."), false, 
classLoader)
+    // scalastyle:on classforname
+    val numCapturedArgs = lambdaProxy.getCapturedArgCount
+    val implMethodType = MethodType.fromMethodDescriptorString(
+      lambdaProxy.getImplMethodSignature, classLoader)
+    val invokedMethodType = MethodType.methodType(
+      fInterface, (0 until numCapturedArgs).map(i => 
implMethodType.parameterType(i)).toArray)
+
+    // scalastyle:off classforname
+    val implClassName = lambdaProxy.getImplClass.replace("/", ".")
+    val implClass = Class.forName(implClassName, false, classLoader)
+    // scalastyle:on classforname
+    val replLookup = getFullPowerLookupFor(implClass)
+
+    val implMethodName = lambdaProxy.getImplMethodName
+    val implMethodHandle = replLookup.findStatic(implClass, implMethodName, 
implMethodType)
+    val funcMethodType = MethodType.fromMethodDescriptorString(
+      lambdaProxy.getFunctionalInterfaceMethodSignature, classLoader)
+    val instantiatedMethodType = MethodType.fromMethodDescriptorString(
+      lambdaProxy.getInstantiatedMethodType, classLoader)
+
+    val callSite = LambdaMetafactory.altMetafactory(
+      replLookup,
+      lambdaProxy.getFunctionalInterfaceMethodName,
+      invokedMethodType,
+      funcMethodType,
+      implMethodHandle,
+      instantiatedMethodType,
+      LambdaMetafactory.FLAG_SERIALIZABLE)
+
+    callSite.getTarget
+  }
+
+  /**
+   * This method is used for full-power lookup for `targetClass` which is used 
for cloning lambdas
+   * created at each REPL line. `targetClass` is expected the enclosing class 
of a lambda.
+   * `MethodHandles.privateLookupIn(targetClass, MethodHandles.lookup())` is 
not
+   * helpful for such use case because targetClass and `ClosureCleaner` which
+   * `MethodHandles.lookup()` calls are loaded into different class loaders, 
and the method returns
+   * a lookup which doesn't have enough privilege to create a lambda using
+   * LambdaMetaFactory.altMetafactory.
+   */
+  private def getFullPowerLookupFor(targetClass: Class[_]): 
MethodHandles.Lookup = {
+    val replLookupCtor = classOf[MethodHandles.Lookup].getDeclaredConstructor(
+      classOf[Class[_]],
+      classOf[Class[_]],
+      classOf[Int])
+    replLookupCtor.setAccessible(true)
+    // -1 means full-power.
+    replLookupCtor.newInstance(targetClass, null, -1)
+  }
 }
 
 private[spark] object IndylambdaScalaClosures extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8c92f4c10aa5..d6f8d40aa51b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2849,7 +2849,6 @@ class SparkContext(config: SparkConf) extends Logging {
    */
   private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = 
true): F = {
     SparkClosureCleaner.clean(f, checkSerializable)
-    f
   }
 
   /**
diff --git 
a/core/src/main/scala/org/apache/spark/util/SparkClosureCleaner.scala 
b/core/src/main/scala/org/apache/spark/util/SparkClosureCleaner.scala
index 44e0efb44941..57b5774ee40b 100644
--- a/core/src/main/scala/org/apache/spark/util/SparkClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/SparkClosureCleaner.scala
@@ -31,19 +31,25 @@ private[spark] object SparkClosureCleaner {
    * @param closure           the closure to clean
    * @param checkSerializable whether to verify that the closure is 
serializable after cleaning
    * @param cleanTransitively whether to clean enclosing closures transitively
+   * @return Cleaned closure if it was actually cleaned or the original 
closure otherwise
    */
-  def clean(
-      closure: AnyRef,
+  def clean[F <: AnyRef](
+      closure: F,
       checkSerializable: Boolean = true,
-      cleanTransitively: Boolean = true): Unit = {
-    if (ClosureCleaner.clean(closure, cleanTransitively, mutable.Map.empty)) {
+      cleanTransitively: Boolean = true): F = {
+    val cleanedClosureOpt = ClosureCleaner.clean(closure, cleanTransitively, 
mutable.Map.empty)
+    if (cleanedClosureOpt.isDefined) {
+      val cleanedClosure = cleanedClosureOpt.get
       try {
         if (checkSerializable && SparkEnv.get != null) {
-          SparkEnv.get.closureSerializer.newInstance().serialize(closure)
+          
SparkEnv.get.closureSerializer.newInstance().serialize(cleanedClosure: AnyRef)
         }
       } catch {
         case ex: Exception => throw new SparkException("Task not 
serializable", ex)
       }
+      cleanedClosure
+    } else {
+      closure
     }
   }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala 
b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
index 85a89aaede95..f9825b6c9a18 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
@@ -94,13 +94,14 @@ class ClosureCleanerSuite2 extends SparkFunSuite {
     // If the resulting closure is not serializable even after
     // cleaning, we expect ClosureCleaner to throw a SparkException
     if (serializableAfter) {
-      SparkClosureCleaner.clean(closure, checkSerializable = true, transitive)
+      val cleanedClosure = SparkClosureCleaner.clean(closure, 
checkSerializable = true, transitive)
+      assertSerializable(cleanedClosure, serializableAfter)
     } else {
       intercept[SparkException] {
         SparkClosureCleaner.clean(closure, checkSerializable = true, 
transitive)
       }
+      assertSerializable(closure, serializableAfter)
     }
-    assertSerializable(closure, serializableAfter)
   }
 
   test("clean basic serializable closures") {
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/UdfToProtoUtils.scala
 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/UdfToProtoUtils.scala
index 486302266539..a2237367225a 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/UdfToProtoUtils.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/UdfToProtoUtils.scala
@@ -58,8 +58,11 @@ private[sql] object UdfToProtoUtils {
       function: AnyRef,
       inputEncoders: Seq[AgnosticEncoder[_]],
       outputEncoder: AgnosticEncoder[_]): ByteString = {
-    ClosureCleaner.clean(function, cleanTransitively = true, mutable.Map.empty)
-    val bytes = SparkSerDeUtils.serialize(UdfPacket(function, inputEncoders, 
outputEncoder))
+    val cleanedFunction = ClosureCleaner
+      .clean(function, cleanTransitively = true, mutable.Map.empty)
+      .getOrElse(function)
+    val bytes =
+      SparkSerDeUtils.serialize(UdfPacket(cleanedFunction, inputEncoders, 
outputEncoder))
     checkDeserializable(bytes)
     ByteString.copyFrom(bytes)
   }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
index f04b9da9b45d..52f726290a4b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
@@ -157,8 +157,9 @@ object StateSpec {
   def function[KeyType, ValueType, StateType, MappedType](
       mappingFunction: (Time, KeyType, Option[ValueType], State[StateType]) => 
Option[MappedType]
     ): StateSpec[KeyType, ValueType, StateType, MappedType] = {
-    SparkClosureCleaner.clean(mappingFunction, checkSerializable = true)
-    new StateSpecImpl(mappingFunction)
+    val cleanedMappingFunction =
+      SparkClosureCleaner.clean(mappingFunction, checkSerializable = true)
+    new StateSpecImpl(cleanedMappingFunction)
   }
 
   /**
@@ -175,10 +176,11 @@ object StateSpec {
   def function[KeyType, ValueType, StateType, MappedType](
       mappingFunction: (KeyType, Option[ValueType], State[StateType]) => 
MappedType
     ): StateSpec[KeyType, ValueType, StateType, MappedType] = {
-    SparkClosureCleaner.clean(mappingFunction, checkSerializable = true)
+    val cleanedMappingFunction =
+      SparkClosureCleaner.clean(mappingFunction, checkSerializable = true)
     val wrappedFunction =
       (time: Time, key: KeyType, value: Option[ValueType], state: 
State[StateType]) => {
-        Some(mappingFunction(key, value, state))
+        Some(cleanedMappingFunction(key, value, state))
       }
     new StateSpecImpl(wrappedFunction)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to