This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 420dd895c fix: route file-not-found errors through SparkError JSON 
path (#3699)
420dd895c is described below

commit 420dd895c1482b72f251109da8e63bd8e9fcc7e6
Author: Andy Grove <[email protected]>
AuthorDate: Mon Mar 16 13:31:37 2026 -0600

    fix: route file-not-found errors through SparkError JSON path (#3699)
---
 dev/diffs/3.5.8.diff                               | 53 +++-------------------
 native/core/src/errors.rs                          | 44 ++++++++++++------
 native/spark-expr/src/error.rs                     | 15 ++++++
 .../scala/org/apache/comet/CometExecIterator.scala | 15 +-----
 .../sql/comet/shims/ShimSparkErrorConverter.scala  | 20 ++++++++
 .../sql/comet/shims/ShimSparkErrorConverter.scala  | 20 ++++++++
 .../sql/comet/shims/ShimSparkErrorConverter.scala  | 21 +++++++++
 7 files changed, 115 insertions(+), 73 deletions(-)

diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index 08b69a6b8..568b33e75 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -502,18 +502,10 @@ index a206e97c353..79813d8e259 100644
  
    test("SPARK-35884: Explain Formatted") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-index 93275487f29..510e3087e0f 100644
+index 93275487f29..ca79ad8b6d9 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
- 
- import scala.collection.mutable
- 
-+import org.apache.comet.CometConf
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.fs.{LocalFileSystem, Path}
- 
-@@ -33,6 +34,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
+@@ -33,6 +33,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
  import 
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
 positiveInt}
  import org.apache.spark.sql.catalyst.plans.logical.Filter
  import org.apache.spark.sql.catalyst.types.DataTypeUtils
@@ -521,16 +513,7 @@ index 93275487f29..510e3087e0f 100644
  import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
  import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
  import org.apache.spark.sql.execution.datasources.FilePartition
-@@ -250,6 +252,8 @@ class FileBasedDataSourceSuite extends QueryTest
-               case "" => "_LEGACY_ERROR_TEMP_2062"
-               case _ => "_LEGACY_ERROR_TEMP_2055"
-             }
-+            // native_datafusion Parquet scan cannot throw a 
SparkFileNotFoundException
-+            assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
-             checkErrorMatchPVals(
-               exception = intercept[SparkException] {
-                 testIgnoreMissingFiles(options)
-@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -639,7 +640,8 @@ class FileBasedDataSourceSuite extends QueryTest
    }
  
    Seq("parquet", "orc").foreach { format =>
@@ -540,7 +523,7 @@ index 93275487f29..510e3087e0f 100644
        withTempDir { dir =>
          val tableName = s"spark_25132_${format}_native"
          val tableDir = dir.getCanonicalPath + s"/$tableName"
-@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -955,6 +957,7 @@ class FileBasedDataSourceSuite extends QueryTest
              assert(bJoinExec.isEmpty)
              val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
                case smJoin: SortMergeJoinExec => smJoin
@@ -548,7 +531,7 @@ index 93275487f29..510e3087e0f 100644
              }
              assert(smJoinExec.nonEmpty)
            }
-@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1015,6 +1018,7 @@ class FileBasedDataSourceSuite extends QueryTest
  
            val fileScan = df.queryExecution.executedPlan collectFirst {
              case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -556,7 +539,7 @@ index 93275487f29..510e3087e0f 100644
            }
            assert(fileScan.nonEmpty)
            assert(fileScan.get.partitionFilters.nonEmpty)
-@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1056,6 +1060,7 @@ class FileBasedDataSourceSuite extends QueryTest
  
            val fileScan = df.queryExecution.executedPlan collectFirst {
              case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -564,7 +547,7 @@ index 93275487f29..510e3087e0f 100644
            }
            assert(fileScan.nonEmpty)
            assert(fileScan.get.partitionFilters.isEmpty)
-@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1240,6 +1245,9 @@ class FileBasedDataSourceSuite extends QueryTest
            val filters = df.queryExecution.executedPlan.collect {
              case f: FileSourceScanLike => f.dataFilters
              case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -1409,28 +1392,6 @@ index 47679ed7865..9ffbaecb98e 100644
      }.length == hashAggCount)
      assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s 
}.length == sortAggCount)
    }
-diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
-index a1147c16cc8..c7a29496328 100644
---- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
-+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
-@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
- 
- import org.apache.spark.{SparkArithmeticException, SparkException, 
SparkFileNotFoundException}
- import org.apache.spark.sql._
-+import org.apache.spark.sql.IgnoreCometNativeDataFusion
- import org.apache.spark.sql.catalyst.TableIdentifier
- import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide}
- import org.apache.spark.sql.catalyst.parser.ParseException
-@@ -968,7 +969,8 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
-     }
-   }
- 
--  test("alter temporary view should follow current storeAnalyzedPlanForView 
config") {
-+  test("alter temporary view should follow current storeAnalyzedPlanForView 
config",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314";))
 {
-     withTable("t") {
-       Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
-       withView("v1") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 index eec396b2e39..bf3f1c769d6 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs
index 7c8957dba..d4582da63 100644
--- a/native/core/src/errors.rs
+++ b/native/core/src/errors.rs
@@ -413,7 +413,37 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, 
backtrace: Option<Strin
                     // Fall back to plain SparkError (no context)
                     throw_spark_error_as_json(env, spark_error)
                 } else {
-                    // Not a SparkError, use generic exception
+                    // Check for file-not-found errors from object store
+                    let error_msg = e.to_string();
+                    if error_msg.contains("not found")
+                        && error_msg.contains("No such file or directory")
+                    {
+                        let spark_error = SparkError::FileNotFound { message: 
error_msg };
+                        throw_spark_error_as_json(env, &spark_error)
+                    } else {
+                        // Not a SparkError, use generic exception
+                        let exception = error.to_exception();
+                        match backtrace {
+                            Some(backtrace_string) => env.throw_new(
+                                exception.class,
+                                to_stacktrace_string(exception.msg, 
backtrace_string).unwrap(),
+                            ),
+                            _ => env.throw_new(exception.class, exception.msg),
+                        }
+                    }
+                }
+            }
+            // Handle direct SparkError - serialize to JSON
+            CometError::Spark(spark_error) => throw_spark_error_as_json(env, 
spark_error),
+            _ => {
+                // Check for file-not-found errors that may arrive through 
other wrapping paths
+                let error_msg = error.to_string();
+                if error_msg.contains("not found")
+                    && error_msg.contains("No such file or directory")
+                {
+                    let spark_error = SparkError::FileNotFound { message: 
error_msg };
+                    throw_spark_error_as_json(env, &spark_error)
+                } else {
                     let exception = error.to_exception();
                     match backtrace {
                         Some(backtrace_string) => env.throw_new(
@@ -424,18 +454,6 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, 
backtrace: Option<Strin
                     }
                 }
             }
-            // Handle direct SparkError - serialize to JSON
-            CometError::Spark(spark_error) => throw_spark_error_as_json(env, 
spark_error),
-            _ => {
-                let exception = error.to_exception();
-                match backtrace {
-                    Some(backtrace_string) => env.throw_new(
-                        exception.class,
-                        to_stacktrace_string(exception.msg, 
backtrace_string).unwrap(),
-                    ),
-                    _ => env.throw_new(exception.class, exception.msg),
-                }
-            }
         }
         .expect("Thrown exception")
     }
diff --git a/native/spark-expr/src/error.rs b/native/spark-expr/src/error.rs
index ae3b5c0ed..592ed8b44 100644
--- a/native/spark-expr/src/error.rs
+++ b/native/spark-expr/src/error.rs
@@ -166,6 +166,9 @@ pub enum SparkError {
     #[error("[SCALAR_SUBQUERY_TOO_MANY_ROWS] Scalar subquery returned more 
than one row.")]
     ScalarSubqueryTooManyRows,
 
+    #[error("{message}")]
+    FileNotFound { message: String },
+
     #[error("ArrowError: {0}.")]
     Arrow(Arc<ArrowError>),
 
@@ -236,6 +239,7 @@ impl SparkError {
             SparkError::InvalidRegexGroupIndex { .. } => 
"InvalidRegexGroupIndex",
             SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder",
             SparkError::ScalarSubqueryTooManyRows => 
"ScalarSubqueryTooManyRows",
+            SparkError::FileNotFound { .. } => "FileNotFound",
             SparkError::Arrow(_) => "Arrow",
             SparkError::Internal(_) => "Internal",
         }
@@ -421,6 +425,11 @@ impl SparkError {
                     "dataType": data_type,
                 })
             }
+            SparkError::FileNotFound { message } => {
+                serde_json::json!({
+                    "message": message,
+                })
+            }
             SparkError::Arrow(e) => {
                 serde_json::json!({
                     "message": e.to_string(),
@@ -487,6 +496,9 @@ impl SparkError {
             SparkError::DatatypeCannotOrder { .. }
             | SparkError::InvalidUtf8String { .. } => 
"org/apache/spark/SparkIllegalArgumentException",
 
+            // FileNotFound - will be converted to SparkFileNotFoundException 
by the shim
+            SparkError::FileNotFound { .. } => 
"org/apache/spark/SparkException",
+
             // Generic errors
             SparkError::Arrow(_) | SparkError::Internal(_) => 
"org/apache/spark/SparkException",
         }
@@ -559,6 +571,9 @@ impl SparkError {
             // Subquery errors
             SparkError::ScalarSubqueryTooManyRows => 
Some("SCALAR_SUBQUERY_TOO_MANY_ROWS"),
 
+            // File not found
+            SparkError::FileNotFound { .. } => Some("_LEGACY_ERROR_TEMP_2055"),
+
             // Generic errors (no error class)
             SparkError::Arrow(_) | SparkError::Internal(_) => None,
         }
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala 
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index 28c164571..44ebf7e36 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -19,11 +19,8 @@
 
 package org.apache.comet
 
-import java.io.FileNotFoundException
 import java.lang.management.ManagementFactory
 
-import scala.util.matching.Regex
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
@@ -163,19 +160,9 @@ class CometExecIterator(
         // threw the exception, so we log the exception with taskAttemptId here
         logError(s"Native execution for task $taskAttemptId failed", e)
 
-        val fileNotFoundPattern: Regex =
-          ("""^External: Object at location (.+?) not found: No such file or 
directory """ +
-            """\(os error \d+\)$""").r
-        val parquetError: Regex =
+        val parquetError: scala.util.matching.Regex =
           """^Parquet error: (?:.*)$""".r
         e.getMessage match {
-          case fileNotFoundPattern(filePath) =>
-            // See 
org.apache.spark.sql.errors.QueryExecutionErrors.readCurrentFileNotFoundError
-            throw new SparkException(
-              errorClass = "_LEGACY_ERROR_TEMP_2055",
-              messageParameters = Map("message" -> e.getMessage),
-              cause = new FileNotFoundException(filePath)
-            ) // Can't use SparkFileNotFoundException because it's private.
           case parquetError() =>
             // See 
org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError
             // See org.apache.parquet.hadoop.ParquetFileReader for error 
message.
diff --git 
a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
 
b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
index 8e6ed1a92..da65b1eb4 100644
--- 
a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
+++ 
b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
@@ -19,12 +19,20 @@
 
 package org.apache.spark.sql.comet.shims
 
+import java.io.FileNotFoundException
+
+import scala.util.matching.Regex
+
 import org.apache.spark.{QueryContext, SparkException}
 import org.apache.spark.sql.catalyst.trees.SQLQueryContext
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
+object ShimSparkErrorConverter {
+  val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r
+}
+
 /**
  * Spark 3.4 implementation for converting error types to proper Spark 
exceptions.
  *
@@ -243,6 +251,18 @@ trait ShimSparkErrorConverter {
           QueryExecutionErrors
             .intervalArithmeticOverflowError("Interval arithmetic overflow", 
"", sqlCtx(context)))
 
+      case "FileNotFound" =>
+        val msg = params("message").toString
+        // Extract file path from native error message and format like Hadoop's
+        // FileNotFoundException: "File <path> does not exist"
+        val path = ShimSparkErrorConverter.ObjectLocationPattern
+          .findFirstMatchIn(msg)
+          .map(_.group(1))
+          .getOrElse(msg)
+        Some(
+          QueryExecutionErrors.readCurrentFileNotFoundError(
+            new FileNotFoundException(s"File $path does not exist")))
+
       case _ =>
         None
     }
diff --git 
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
 
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
index 9bd8c7dba..ae21d1276 100644
--- 
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
+++ 
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
@@ -19,12 +19,20 @@
 
 package org.apache.spark.sql.comet.shims
 
+import java.io.FileNotFoundException
+
+import scala.util.matching.Regex
+
 import org.apache.spark.{QueryContext, SparkException}
 import org.apache.spark.sql.catalyst.trees.SQLQueryContext
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
+object ShimSparkErrorConverter {
+  val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r
+}
+
 /**
  * Spark 3.5 implementation for converting error types to proper Spark 
exceptions.
  *
@@ -239,6 +247,18 @@ trait ShimSparkErrorConverter {
           QueryExecutionErrors
             .intervalArithmeticOverflowError("Interval arithmetic overflow", 
"", sqlCtx(context)))
 
+      case "FileNotFound" =>
+        val msg = params("message").toString
+        // Extract file path from native error message and format like Hadoop's
+        // FileNotFoundException: "File <path> does not exist"
+        val path = ShimSparkErrorConverter.ObjectLocationPattern
+          .findFirstMatchIn(msg)
+          .map(_.group(1))
+          .getOrElse(msg)
+        Some(
+          QueryExecutionErrors.readCurrentFileNotFoundError(
+            new FileNotFoundException(s"File $path does not exist")))
+
       case _ =>
         None
     }
diff --git 
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
 
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
index e49c789a7..01d4eac4b 100644
--- 
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
+++ 
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
@@ -19,12 +19,19 @@
 
 package org.apache.spark.sql.comet.shims
 
+import scala.util.matching.Regex
+
 import org.apache.spark.QueryContext
 import org.apache.spark.SparkException
+import org.apache.spark.SparkFileNotFoundException
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
+object ShimSparkErrorConverter {
+  val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r
+}
+
 /**
  * Spark 4.0-specific implementation for converting error types to proper 
Spark exceptions.
  */
@@ -251,6 +258,20 @@ trait ShimSparkErrorConverter {
           
QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError(
             context.headOption.orNull))
 
+      case "FileNotFound" =>
+        val msg = params("message").toString
+        // Extract file path from native error message and format like Hadoop's
+        // FileNotFoundException: "File <path> does not exist"
+        val path = ShimSparkErrorConverter.ObjectLocationPattern
+          .findFirstMatchIn(msg)
+          .map(_.group(1))
+          .getOrElse(msg)
+        // readCurrentFileNotFoundError was removed in Spark 4.0; construct 
directly
+        Some(
+          new SparkFileNotFoundException(
+            errorClass = "_LEGACY_ERROR_TEMP_2055",
+            messageParameters = Map("message" -> s"File $path does not 
exist")))
+
       case _ =>
         // Unknown error type - return None to trigger fallback
         None


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to