agrubb86 opened a new issue, #12178:
URL: https://github.com/apache/iceberg/issues/12178

   ### Apache Iceberg version
   
   1.7.1 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   To quickly summarize the issue, when writing to then reading from an Iceberg 
table stored in Nessie using PySpark, query results when reading are incorrect 
(data in the parquet file is correct) and certain types of queries cause 
SIGSEGVs in the Spark executors. I'm putting these into a single issue as I 
only encounter the SIGSEGVs when reading data incorrectly in certain types of 
queries. With queries that do cause SIGSEGVs, the number of SIGSEGVs 
encountered per query ranges from 0 - 4 (job is killed after 4 executor 
failures).
   
   **Environment:**
   ```
   - aarch64
   - Java 17.0.13+11
   - Spark 3.5.4
   - Iceberg 1.7.1
   - Nessie 0.101.2
   - EKS on AWS
   ```
   
   **Jars added to Spark:**
   ```
   - avatica-core-1.25.0.jar
   - aws-java-sdk-bundle-1.12.262.jar
   - hadoop-aws-3.3.4.jar
   - hbase-shaded-mapreduce-2.4.15.jar
   - hbase-spark-1.1.0-SNAPSHOT.jar
   - hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar
   - htrace-core4-4.2.0-incubating.jar
   - httpclient5-5.4.1.jar
   - httpcore5-5.3.1.jar
   - httpcore5-h2-5.3.1.jar
   - iceberg-aws-bundle-1.7.1.jar
   - iceberg-spark-runtime-3.5_2.12-1.7.1.jar
   - log4j-slf4j-impl-2.18.0.jar
   - mysql-connector-j-9.1.0.jar
   - nessie-spark-extensions-3.5_2.12-0.101.2.jar
   - protobuf-java-3.21.9.jar
   - slf4j-api-1.7.36.jar
   - spark-hadoop-cloud_2.12-3.5.4.jar
   
   ```
   
   **(Potentially) Relevant Spark configs:**
   ```
   spark.hadoop.fs.s3a.impl                   
org.apache.hadoop.fs.s3a.S3AFileSystem
   spark.hadoop.fs.s3a.connection.ssl.enabled false
   spark.hadoop.fs.s3a.fast.upload            true
   spark.hive.metastore.uris                  thrift://XXX:9083
   spark.kryo.registrationRequired            false
   spark.kryo.unsafe                          false
   spark.kryoserializer.buffer                1m
   spark.kryoserializer.buffer.max            1g
   spark.serializer                           
org.apache.spark.serializer.KryoSerializer
   spark.shuffle.sort.io.plugin.class         
org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
   spark.shuffle.useOldFetchProtocol          true
   spark.sql.catalogImplementation            hive
   spark.sql.catalog.nessie                   
org.apache.iceberg.spark.SparkCatalog
   spark.sql.catalog.nessie.auth_type         NONE
   spark.sql.catalog.nessie.catalog-impl      
org.apache.iceberg.nessie.NessieCatalog
   spark.sql.catalog.nessie.ref               dev
   spark.sql.catalog.nessie.uri               http://XXX:19120/api/v1
   spark.sql.catalog.nessie.warehouse         s3a://XXX/
   spark.sql.execution.arrow.pyspark.enabled  true
   spark.sql.extensions                       
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions
   spark.sql.sources.partitionOverwriteMode   dynamic
   spark.sql.warehouse.dir                    s3a://XXX/
   spark.submit.deployMode                    cluster
   ```
   
   **Source table definition:**
   ```
   CREATE TABLE source_table (
     id1 STRING,
     id2 STRING,
     pt_col1 STRING,
     cnt INT,
     dt STRING)
   USING iceberg
   PARTITIONED BY (dt, pt_col1)
   TBLPROPERTIES (
     'format' = 'iceberg/parquet',
     'format-version' = '2',
     'gc.enabled' = 'false',
     'write.metadata.delete-after-commit.enabled' = 'false',
     'write.parquet.compression-codec' = 'zstd')
   ```
   
   **Example source data:**
   ```
   id1                | id2         | pt_col1 | cnt | dt
   1a3bcbf1-f334-4408 | example.com | val1    | 1   | 2025-01-25
   ```
   
   **Sink table definition:**
   ```
   CREATE TABLE sink_table (
     id1 STRING,
     pt_col1 STRING,
     distinct_id2 INT,
     dt STRING)
   USING iceberg
   PARTITIONED BY (dt, pt_col1)
   TBLPROPERTIES (
     'format' = 'iceberg/parquet',
     'format-version' = '2',
     'gc.enabled' = 'false',
     'write.metadata.delete-after-commit.enabled' = 'false',
     'write.parquet.compression-codec' = 'zstd')
   ```
   
   **Insert query:**
   ```
   INSERT OVERWRITE TABLE sink_table
       PARTITION (dt = '2025-01-26')
   SELECT id1,
          pt_col1,
          COUNT(DISTINCT id2)
   FROM source_table
   WHERE dt >= '2024-10-29'
     AND dt <= '2025-01-26'
   GROUP BY 1, 2
   ```
   
   _At this point, data in the parquet files is correct according to_ `hadoop 
jar parquet-tools-1.11.0.jar dump`_, i.e. all id1's have_ `distinct_id2 >= 1`
   
   **Data stats:**
   ```
   dt         | pt_col1 | count(1)
   2025-01-25 | val2    | 3402
   2025-01-25 | val1    | 1022
   2025-01-24 | val1    | 519
   2025-01-24 | val2    | 1777
   2025-01-26 | val2    | 5158
   2025-01-26 | val1    | 1524
   
   ```
   _Note that_ `dt='2025-01-26', val='val2'` _is the only partition combination 
where data incorrectness and SIGSEGVs are encountered, and_ `num_rows_incorrect 
= num_total_rows - 5000` _with_ `SELECT *` _. The list of id1's that are 
incorrect are the same each time the insert query is run._
   
   **The following queries return correct results without SIGSEGV:**
   ```
   SELECT [* / COUNT(*)] 
   FROM sink_table 
   WHERE distinct_id2 = 0
   ```
   _Returns:_ [0 rows / 0]
   
   ```
   SELECT COUNT(*) 
   FROM sink_table 
   WHERE dt = '2025-01-26
     AND pt_col1 = 'val2'
   ```
   _Returns:_ 5158 rows
   
   **The following query returns correct results with SIGSEGV:**
   ```
   SELECT id1, 
          count(*) 
   FROM sink_table 
   WHERE distinct_id2 = 2 
   GROUP BY 1
   ```
   _Returns:_ 1 row
   
   **The following queries return incorrect results without SIGSEGV:**
   ```
   SELECT * 
   FROM sink_table
   WHERE dt = '2025-01-26' 
     AND pt_col1 = 'val2'
   ```
   _Returns:_ 158 rows with `distinct_id2 = 0`
   _Expected:_ 0 rows with `distinct_id2 = 0`
   
   ```
   INSERT OVERWRITE TABLE sink_table_clone
   SELECT *
   FROM sink_table
   WHERE dt = '2025-01-26'
   ```
   _Writes a parquet file with_ `distinct_id2 = 0` _as the value for 158 rows_
   
   **The following queries return incorrect results with SIGSEGV:**
   ```
   SELECT * 
   FROM sink_table 
   WHERE dt = '2025-01-26
     AND pt_col1 = 'val2'
   ORDER BY distinct_id2 DESC
   ```
   _Returns:_ 158 rows with `distinct_id2 = 0`
   _Expected:_ 0 rows with `distinct_id2 = 0`
   
   ```
   SELECT id1, 
          count(*) 
   FROM sink_table 
   WHERE distinct_id2 = 1 
   GROUP BY 1
   ```
   _Returns:_ 4999 rows
   _Expected:_ 5157 rows
   
   ***Segfaults:***
   ```
   25/02/04 10:50:03 INFO Executor: Running task 0.0 in stage 249.0 (TID 197)
   25/02/04 10:50:03 INFO MapOutputTrackerWorker: Updating epoch to 89 and 
clearing cache
   25/02/04 10:50:03 INFO TorrentBroadcast: Started reading broadcast variable 
268 with 1 pieces (estimated total size 4.0 MiB)
   25/02/04 10:50:03 INFO MemoryStore: Block broadcast_268_piece0 stored as 
bytes in memory (estimated size 21.3 KiB, free 5.9 GiB)
   25/02/04 10:50:03 INFO TorrentBroadcast: Reading broadcast variable 268 took 
4 ms
   25/02/04 10:50:03 INFO MemoryStore: Block broadcast_268 stored as values in 
memory (estimated size 48.2 KiB, free 5.9 GiB)
   #
   # A fatal error has been detected by the Java Runtime Environment:
   #
   #  SIGSEGV (0xb) at pc=0x0000ffff99f54ae0, pid=15, tid=282
   #
   # JRE version: OpenJDK Runtime Environment Temurin-17.0.13+11 (17.0.13+11) 
(build 17.0.13+11)
   # Java VM: OpenJDK 64-Bit Server VM Temurin-17.0.13+11 (17.0.13+11, mixed 
mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, 
linux-aarch64)
   # Problematic frame:
   # V  [libjvm.so+0xd44ae0]  ConcurrentHashTable<SymbolTableConfig, 
(MEMFLAGS)10>::Node* ConcurrentHashTable<SymbolTableConfig, 
(MEMFLAGS)10>::get_node<SymbolTableLookup>(ConcurrentHashTable<SymbolTableConfig,
 (MEMFLAGS)10>::Bucket const*, SymbolTableLookup&, bool*, unsigned long*) const 
[clone .isra.0]+0x50
   ```
   _This segfault is seen the overwhelming majority of the time. The other 
three were seen either once or twice each. The attached core dump is for this 
segfault; I wasn't able to retrieve the core dumps for the others due to the 
pod going away too quickly._
   
[core_dump.txt](https://github.com/user-attachments/files/18669734/core_dump.txt)
   
   ```
   25/02/04 09:58:36 INFO Executor: Running task 0.0 in stage 201.0 (TID 159)
   25/02/04 09:58:36 INFO TorrentBroadcast: Started reading broadcast variable 
211 with 1 pieces (estimated total size 4.0 MiB)
   25/02/04 09:58:36 INFO MemoryStore: Block broadcast_211_piece0 stored as 
bytes in memory (estimated size 12.2 KiB, free 5.9 GiB)
   25/02/04 09:58:36 INFO TorrentBroadcast: Reading broadcast variable 211 took 
4 ms
   25/02/04 09:58:36 INFO MemoryStore: Block broadcast_211 stored as values in 
memory (estimated size 29.0 KiB, free 5.9 GiB)
   25/02/04 09:58:36 INFO TransportClientFactory: Found inactive connection to 
spark-2f557594cfd0870d-driver-svc.spark-dev.svc/10.3.26.54:7078, creating a new 
one.
   25/02/04 09:58:36 INFO TransportClientFactory: Successfully created 
connection to spark-2f557594cfd0870d-driver-svc.spark-dev.svc/10.3.26.54:7078 
after 0 ms (0 ms spent in bootstraps)
   25/02/04 09:58:36 INFO CodeGenerator: Code generated in 46.470428 ms
   25/02/04 09:58:36 INFO SerializableTableWithSize: Releasing resources
   25/02/04 09:58:36 INFO SerializableTableWithSize: Releasing resources
   25/02/04 09:58:36 INFO CodeGenerator: Code generated in 15.156982 ms
   25/02/04 09:58:36 INFO TorrentBroadcast: Started reading broadcast variable 
210 with 1 pieces (estimated total size 4.0 MiB)
   25/02/04 09:58:36 INFO MemoryStore: Block broadcast_210_piece0 stored as 
bytes in memory (estimated size 30.8 KiB, free 5.8 GiB)
   25/02/04 09:58:36 INFO TorrentBroadcast: Reading broadcast variable 210 took 
4 ms
   25/02/04 09:58:36 INFO MemoryStore: Block broadcast_210 stored as values in 
memory (estimated size 32.0 KiB, free 5.8 GiB)
   25/02/04 09:58:36 INFO S3AInputStream: Switching to Random IO seek policy
   25/02/04 09:58:36 INFO CodecPool: Got brand-new decompressor [.zstd]
   25/02/04 09:58:36 INFO S3AInputStream: Switching to Random IO seek policy
   25/02/04 09:58:36 INFO CodecPool: Got brand-new decompressor [.zstd]
   #
   # A fatal error has been detected by the Java Runtime Environment:
   #
   #  SIGSEGV (0xb) at pc=0x0000ffff9093553c, pid=17, tid=162
   #
   # JRE version: OpenJDK Runtime Environment Temurin-17.0.13+11 (17.0.13+11) 
(build 17.0.13+11)
   # Java VM: OpenJDK 64-Bit Server VM Temurin-17.0.13+11 (17.0.13+11, mixed 
mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, 
linux-aarch64)
   # Problematic frame:
   # j  
org.apache.iceberg.shaded.io.netty.util.internal.InternalThreadLocalMap.slowGet()Lorg/apache/iceberg/shaded/io/netty/util/internal/InternalThreadLocalMap;+0
   ```
   
   ```
   25/02/04 12:39:14 INFO Executor: Running task 0.0 in stage 286.3 (TID 233)
   25/02/04 12:39:14 INFO MapOutputTrackerWorker: Updating epoch to 106 and 
clearing cache
   25/02/04 12:39:14 INFO TorrentBroadcast: Started reading broadcast variable 
333 with 1 pieces (estimated total size 4.0 MiB)
   25/02/04 12:39:14 INFO MemoryStore: Block broadcast_333_piece0 stored as 
bytes in memory (estimated size 18.0 KiB, free 5.9 GiB)
   25/02/04 12:39:14 INFO TorrentBroadcast: Reading broadcast variable 333 took 
13 ms
   25/02/04 12:39:14 INFO MemoryStore: Block broadcast_333 stored as values in 
memory (estimated size 39.2 KiB, free 5.9 GiB)
   25/02/04 12:39:14 INFO MapOutputTrackerWorker: Don't have map outputs for 
shuffle 79, fetching them
   25/02/04 12:39:14 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
endpoint = 
NettyRpcEndpointRef(spark://mapoutputtrac...@spark-2f557594cfd0870d-driver-svc.spark-dev.svc:7078)
   25/02/04 12:39:14 INFO MapOutputTrackerWorker: Got the map output locations
   25/02/04 12:39:14 INFO ShuffleBlockFetcherIterator: Getting 200 (218.1 KiB) 
non-empty blocks including 200 (218.1 KiB) local and 0 (0.0 B) host-local and 0 
(0.0 B) push-merged-local and 0 (0.0 B) remote blocks
   25/02/04 12:39:14 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches 
in 13 ms
   #
   # A fatal error has been detected by the Java Runtime Environment:
   #
   #  SIGSEGV (0xb) at pc=0x0000ffff7f7fbee0, pid=15, tid=59
   #
   # JRE version: OpenJDK Runtime Environment Temurin-17.0.13+11 (17.0.13+11) 
(build 17.0.13+11)
   # Java VM: OpenJDK 64-Bit Server VM Temurin-17.0.13+11 (17.0.13+11, mixed 
mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, 
linux-aarch64)
   # Problematic frame:
   # V  [libjvm.so+0x7cbee0]  
AccessInternal::PostRuntimeDispatch<G1BarrierSet::AccessBarrier<282726ul, 
G1BarrierSet>, (AccessInternal::BarrierType)3, 
282726ul>::oop_access_barrier(oopDesc*, long)+0x0
   ```
   
   ```
   25/02/04 12:30:02 INFO Executor: Running task 0.0 in stage 286.0 (TID 224)
   25/02/04 12:30:02 INFO MapOutputTrackerWorker: Updating epoch to 100 and 
clearing cache
   25/02/04 12:30:02 INFO TorrentBroadcast: Started reading broadcast variable 
327 with 1 pieces (estimated total size 4.0 MiB)
   25/02/04 12:30:02 INFO MemoryStore: Block broadcast_327_piece0 stored as 
bytes in memory (estimated size 18.0 KiB, free 5.9 GiB)
   25/02/04 12:30:02 INFO TorrentBroadcast: Reading broadcast variable 327 took 
6 ms
   25/02/04 12:30:02 INFO MemoryStore: Block broadcast_327 stored as values in 
memory (estimated size 39.2 KiB, free 5.9 GiB)
   25/02/04 12:30:02 INFO MapOutputTrackerWorker: Don't have map outputs for 
shuffle 79, fetching them
   25/02/04 12:30:02 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
endpoint = 
NettyRpcEndpointRef(spark://mapoutputtrac...@spark-2f557594cfd0870d-driver-svc.spark-dev.svc:7078)
   25/02/04 12:30:02 INFO MapOutputTrackerWorker: Got the map output locations
   25/02/04 12:30:02 INFO ShuffleBlockFetcherIterator: Getting 200 (218.1 KiB) 
non-empty blocks including 200 (218.1 KiB) local and 0 (0.0 B) host-local and 0 
(0.0 B) push-merged-local and 0 (0.0 B) remote blocks
   25/02/04 12:30:02 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches 
in 2 ms
   25/02/04 12:30:02 INFO CodeGenerator: Code generated in 16.769295 ms
   #
   # A fatal error has been detected by the Java Runtime Environment:
   #
   #  SIGSEGV (0xb) at pc=0x0000ffff9421465c, pid=15, tid=39
   #
   # JRE version: OpenJDK Runtime Environment Temurin-17.0.13+11 (17.0.13+11) 
(build 17.0.13+11)
   # Java VM: OpenJDK 64-Bit Server VM Temurin-17.0.13+11 (17.0.13+11, mixed 
mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, 
linux-aarch64)
   # Problematic frame:
   # V  [libjvm.so+0x6b465c]  void 
WeakProcessor::Task::work<G1STWIsAliveClosure, G1KeepAliveClosure>(unsigned 
int, G1STWIsAliveClosure*, G1KeepAliveClosure*)+0x1fc
   ```
   
   Please let me know if I can be of further assistance.
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [ ] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [x] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to