and124578963 opened a new pull request, #12984:
URL: https://github.com/apache/iceberg/pull/12984

   Hi all,
   
   After upgrading Apache Iceberg from version 1.7.1 to 1.7.2, I started 
encountering a Py4JJavaError related to Kryo serialization when using S3FileIO. 
The issue manifests when calling collect() on a PySpark DataFrame.
   
   ### Environment:
   - Spark version: 3.5.4
   - Running from: Jupyter Notebook
   - Iceberg IO: org.apache.iceberg.aws.s3.S3FileIO
   - Spark Serializer: org.apache.spark.serializer.KryoSerializer
   
   ### Spark Configuration:
   ```
   spark.sql.defaultCatalog: iceberg
   spark.sql.catalog.iceberg.io-impl: org.apache.iceberg.aws.s3.S3FileIO
   spark.sql.catalog.iceberg.client.credentials-provider: 
software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider
   spark.serializer: org.apache.spark.serializer.KryoSerializer
   spark.sql.extensions: 
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
   spark.sql.catalog.iceberg: org.apache.iceberg.spark.SparkCatalog
   spark.sql.catalog.iceberg.type: hive
   ```
   
   ### **Error Stack Trace (look at end):**
   ```
   Py4JJavaError: An error occurred while calling o580.collectToPython.
   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 
(TID 3) (10.113.141.177 executor 1): java.io.IOException: 
com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: Could not 
serialize lambda
   Serialization trace:
   s3 (org.apache.iceberg.aws.s3.S3FileIO)
   io 
(org.apache.iceberg.spark.source.SerializableTableWithSize$SerializableMetadataTableWithSize)
   at 
org.apache.spark.util.SparkErrorUtils.tryOrIOException(SparkErrorUtils.scala:42)
   at 
org.apache.spark.util.SparkErrorUtils.tryOrIOException$(SparkErrorUtils.scala:33)
   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:94)
   at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:252)
   at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:109)
   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
   at 
org.apache.iceberg.spark.source.SparkInputPartition.table(SparkInputPartition.java:81)
   at 
org.apache.iceberg.spark.source.RowDataReader.<init>(RowDataReader.java:50)
   at 
org.apache.iceberg.spark.source.SparkRowReaderFactory.createReader(SparkRowReaderFactory.java:45)
   at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:84)
   at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
   at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
   ...
   Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.RuntimeException: Could not serialize lambda
   Serialization trace:
   s3 (org.apache.iceberg.aws.s3.S3FileIO)
   io 
(org.apache.iceberg.spark.source.SerializableTableWithSize$SerializableMetadataTableWithSize)
   at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
   at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
   ...
   Caused by: java.lang.RuntimeException: Could not serialize lambda
   at 
com.esotericsoftware.kryo.serializers.ClosureSerializer.read(ClosureSerializer.java:78)
   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
   at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
   ... 57 more
   Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index 113 out of bounds for length 28
   Serialization trace:
   chunks (org.apache.spark.util.io.ChunkedByteBuffer)
   allProperties (org.apache.iceberg.aws.AwsClientProperties)
   awsClientProperties 
(org.apache.iceberg.aws.AwsClientFactories$DefaultAwsClientFactory)
   capturedArgs (java.lang.invoke.SerializedLambda)
   at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
   ...
   Caused by: java.lang.IndexOutOfBoundsException: Index 113 out of bounds for 
length 28
   at 
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
   ... 74 more
   ```
   ### Analysis:
   My understanding is that when using 
org.apache.spark.serializer.KryoSerializer, it might not have a registered 
serializer for org.apache.iceberg.util.SerializableMap, or there are general 
issues with Kryo serializing this specific class in this context, particularly 
when it's part of AwsClientProperties.
   The problem appears after https://github.com/apache/iceberg/pull/11971
   
   ### Proposed Fix:
   I've replaced the usage of org.apache.iceberg.util.SerializableMap with a 
standard java.util.HashMap in the relevant places (e.g., within 
AwsClientProperties or classes it holds, where the SerializableMap was 
problematic). Since the keys and values involved (typically String) are 
inherently serializable, using a standard HashMap should not cause any 
serialization issues and is generally well-supported by Kryo.
   
   ### Result:
   After applying this change, the KryoException no longer occurs, and the 
Spark job completes successfully.
   Thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to