and124578963 opened a new pull request, #12984: URL: https://github.com/apache/iceberg/pull/12984
Hi all, After upgrading Apache Iceberg from version 1.7.1 to 1.7.2, I started encountering a Py4JJavaError related to Kryo serialization when using S3FileIO. The issue manifests when calling collect() on a PySpark DataFrame. ### Environment: - Spark version: 3.5.4 - Running from: Jupyter Notebook - Iceberg IO: org.apache.iceberg.aws.s3.S3FileIO - Spark Serializer: org.apache.spark.serializer.KryoSerializer ### Spark Configuration: ``` spark.sql.defaultCatalog: iceberg spark.sql.catalog.iceberg.io-impl: org.apache.iceberg.aws.s3.S3FileIO spark.sql.catalog.iceberg.client.credentials-provider: software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider spark.serializer: org.apache.spark.serializer.KryoSerializer spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.iceberg: org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.iceberg.type: hive ``` ### **Error Stack Trace (look at end):** ``` Py4JJavaError: An error occurred while calling o580.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.113.141.177 executor 1): java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: Could not serialize lambda Serialization trace: s3 (org.apache.iceberg.aws.s3.S3FileIO) io (org.apache.iceberg.spark.source.SerializableTableWithSize$SerializableMetadataTableWithSize) at org.apache.spark.util.SparkErrorUtils.tryOrIOException(SparkErrorUtils.scala:42) at org.apache.spark.util.SparkErrorUtils.tryOrIOException$(SparkErrorUtils.scala:33) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:94) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:252) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:109) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.iceberg.spark.source.SparkInputPartition.table(SparkInputPartition.java:81) at org.apache.iceberg.spark.source.RowDataReader.<init>(RowDataReader.java:50) at org.apache.iceberg.spark.source.SparkRowReaderFactory.createReader(SparkRowReaderFactory.java:45) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:84) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) ... Caused by: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: Could not serialize lambda Serialization trace: s3 (org.apache.iceberg.aws.s3.S3FileIO) io (org.apache.iceberg.spark.source.SerializableTableWithSize$SerializableMetadataTableWithSize) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731) ... Caused by: java.lang.RuntimeException: Could not serialize lambda at com.esotericsoftware.kryo.serializers.ClosureSerializer.read(ClosureSerializer.java:78) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ... 57 more Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index 113 out of bounds for length 28 Serialization trace: chunks (org.apache.spark.util.io.ChunkedByteBuffer) allProperties (org.apache.iceberg.aws.AwsClientProperties) awsClientProperties (org.apache.iceberg.aws.AwsClientFactories$DefaultAwsClientFactory) capturedArgs (java.lang.invoke.SerializedLambda) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144) ... Caused by: java.lang.IndexOutOfBoundsException: Index 113 out of bounds for length 28 at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) ... 74 more ``` ### Analysis: My understanding is that when using org.apache.spark.serializer.KryoSerializer, it might not have a registered serializer for org.apache.iceberg.util.SerializableMap, or there are general issues with Kryo serializing this specific class in this context, particularly when it's part of AwsClientProperties. The problem appears after https://github.com/apache/iceberg/pull/11971 ### Proposed Fix: I've replaced the usage of org.apache.iceberg.util.SerializableMap with a standard java.util.HashMap in the relevant places (e.g., within AwsClientProperties or classes it holds, where the SerializableMap was problematic). Since the keys and values involved (typically String) are inherently serializable, using a standard HashMap should not cause any serialization issues and is generally well-supported by Kryo. ### Result: After applying this change, the KryoException no longer occurs, and the Spark job completes successfully. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org