This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 420dd895c fix: route file-not-found errors through SparkError JSON
path (#3699)
420dd895c is described below
commit 420dd895c1482b72f251109da8e63bd8e9fcc7e6
Author: Andy Grove <[email protected]>
AuthorDate: Mon Mar 16 13:31:37 2026 -0600
fix: route file-not-found errors through SparkError JSON path (#3699)
---
dev/diffs/3.5.8.diff | 53 +++-------------------
native/core/src/errors.rs | 44 ++++++++++++------
native/spark-expr/src/error.rs | 15 ++++++
.../scala/org/apache/comet/CometExecIterator.scala | 15 +-----
.../sql/comet/shims/ShimSparkErrorConverter.scala | 20 ++++++++
.../sql/comet/shims/ShimSparkErrorConverter.scala | 20 ++++++++
.../sql/comet/shims/ShimSparkErrorConverter.scala | 21 +++++++++
7 files changed, 115 insertions(+), 73 deletions(-)
diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index 08b69a6b8..568b33e75 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -502,18 +502,10 @@ index a206e97c353..79813d8e259 100644
test("SPARK-35884: Explain Formatted") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-index 93275487f29..510e3087e0f 100644
+index 93275487f29..ca79ad8b6d9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
-
- import scala.collection.mutable
-
-+import org.apache.comet.CometConf
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.fs.{LocalFileSystem, Path}
-
-@@ -33,6 +34,7 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
+@@ -33,6 +33,7 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
import
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
positiveInt}
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.types.DataTypeUtils
@@ -521,16 +513,7 @@ index 93275487f29..510e3087e0f 100644
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.FilePartition
-@@ -250,6 +252,8 @@ class FileBasedDataSourceSuite extends QueryTest
- case "" => "_LEGACY_ERROR_TEMP_2062"
- case _ => "_LEGACY_ERROR_TEMP_2055"
- }
-+ // native_datafusion Parquet scan cannot throw a
SparkFileNotFoundException
-+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
- checkErrorMatchPVals(
- exception = intercept[SparkException] {
- testIgnoreMissingFiles(options)
-@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -639,7 +640,8 @@ class FileBasedDataSourceSuite extends QueryTest
}
Seq("parquet", "orc").foreach { format =>
@@ -540,7 +523,7 @@ index 93275487f29..510e3087e0f 100644
withTempDir { dir =>
val tableName = s"spark_25132_${format}_native"
val tableDir = dir.getCanonicalPath + s"/$tableName"
-@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -955,6 +957,7 @@ class FileBasedDataSourceSuite extends QueryTest
assert(bJoinExec.isEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
@@ -548,7 +531,7 @@ index 93275487f29..510e3087e0f 100644
}
assert(smJoinExec.nonEmpty)
}
-@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1015,6 +1018,7 @@ class FileBasedDataSourceSuite extends QueryTest
val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -556,7 +539,7 @@ index 93275487f29..510e3087e0f 100644
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
-@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1056,6 +1060,7 @@ class FileBasedDataSourceSuite extends QueryTest
val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -564,7 +547,7 @@ index 93275487f29..510e3087e0f 100644
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.isEmpty)
-@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1240,6 +1245,9 @@ class FileBasedDataSourceSuite extends QueryTest
val filters = df.queryExecution.executedPlan.collect {
case f: FileSourceScanLike => f.dataFilters
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -1409,28 +1392,6 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s
}.length == sortAggCount)
}
-diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
-index a1147c16cc8..c7a29496328 100644
---- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
-+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
-@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
-
- import org.apache.spark.{SparkArithmeticException, SparkException,
SparkFileNotFoundException}
- import org.apache.spark.sql._
-+import org.apache.spark.sql.IgnoreCometNativeDataFusion
- import org.apache.spark.sql.catalyst.TableIdentifier
- import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide}
- import org.apache.spark.sql.catalyst.parser.ParseException
-@@ -968,7 +969,8 @@ abstract class SQLViewSuite extends QueryTest with
SQLTestUtils {
- }
- }
-
-- test("alter temporary view should follow current storeAnalyzedPlanForView
config") {
-+ test("alter temporary view should follow current storeAnalyzedPlanForView
config",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314"))
{
- withTable("t") {
- Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
- withView("v1") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index eec396b2e39..bf3f1c769d6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs
index 7c8957dba..d4582da63 100644
--- a/native/core/src/errors.rs
+++ b/native/core/src/errors.rs
@@ -413,7 +413,37 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError,
backtrace: Option<Strin
// Fall back to plain SparkError (no context)
throw_spark_error_as_json(env, spark_error)
} else {
- // Not a SparkError, use generic exception
+ // Check for file-not-found errors from object store
+ let error_msg = e.to_string();
+ if error_msg.contains("not found")
+ && error_msg.contains("No such file or directory")
+ {
+ let spark_error = SparkError::FileNotFound { message:
error_msg };
+ throw_spark_error_as_json(env, &spark_error)
+ } else {
+ // Not a SparkError, use generic exception
+ let exception = error.to_exception();
+ match backtrace {
+ Some(backtrace_string) => env.throw_new(
+ exception.class,
+ to_stacktrace_string(exception.msg,
backtrace_string).unwrap(),
+ ),
+ _ => env.throw_new(exception.class, exception.msg),
+ }
+ }
+ }
+ }
+ // Handle direct SparkError - serialize to JSON
+ CometError::Spark(spark_error) => throw_spark_error_as_json(env,
spark_error),
+ _ => {
+ // Check for file-not-found errors that may arrive through
other wrapping paths
+ let error_msg = error.to_string();
+ if error_msg.contains("not found")
+ && error_msg.contains("No such file or directory")
+ {
+ let spark_error = SparkError::FileNotFound { message:
error_msg };
+ throw_spark_error_as_json(env, &spark_error)
+ } else {
let exception = error.to_exception();
match backtrace {
Some(backtrace_string) => env.throw_new(
@@ -424,18 +454,6 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError,
backtrace: Option<Strin
}
}
}
- // Handle direct SparkError - serialize to JSON
- CometError::Spark(spark_error) => throw_spark_error_as_json(env,
spark_error),
- _ => {
- let exception = error.to_exception();
- match backtrace {
- Some(backtrace_string) => env.throw_new(
- exception.class,
- to_stacktrace_string(exception.msg,
backtrace_string).unwrap(),
- ),
- _ => env.throw_new(exception.class, exception.msg),
- }
- }
}
.expect("Thrown exception")
}
diff --git a/native/spark-expr/src/error.rs b/native/spark-expr/src/error.rs
index ae3b5c0ed..592ed8b44 100644
--- a/native/spark-expr/src/error.rs
+++ b/native/spark-expr/src/error.rs
@@ -166,6 +166,9 @@ pub enum SparkError {
#[error("[SCALAR_SUBQUERY_TOO_MANY_ROWS] Scalar subquery returned more
than one row.")]
ScalarSubqueryTooManyRows,
+ #[error("{message}")]
+ FileNotFound { message: String },
+
#[error("ArrowError: {0}.")]
Arrow(Arc<ArrowError>),
@@ -236,6 +239,7 @@ impl SparkError {
SparkError::InvalidRegexGroupIndex { .. } =>
"InvalidRegexGroupIndex",
SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder",
SparkError::ScalarSubqueryTooManyRows =>
"ScalarSubqueryTooManyRows",
+ SparkError::FileNotFound { .. } => "FileNotFound",
SparkError::Arrow(_) => "Arrow",
SparkError::Internal(_) => "Internal",
}
@@ -421,6 +425,11 @@ impl SparkError {
"dataType": data_type,
})
}
+ SparkError::FileNotFound { message } => {
+ serde_json::json!({
+ "message": message,
+ })
+ }
SparkError::Arrow(e) => {
serde_json::json!({
"message": e.to_string(),
@@ -487,6 +496,9 @@ impl SparkError {
SparkError::DatatypeCannotOrder { .. }
| SparkError::InvalidUtf8String { .. } =>
"org/apache/spark/SparkIllegalArgumentException",
+ // FileNotFound - will be converted to SparkFileNotFoundException
by the shim
+ SparkError::FileNotFound { .. } =>
"org/apache/spark/SparkException",
+
// Generic errors
SparkError::Arrow(_) | SparkError::Internal(_) =>
"org/apache/spark/SparkException",
}
@@ -559,6 +571,9 @@ impl SparkError {
// Subquery errors
SparkError::ScalarSubqueryTooManyRows =>
Some("SCALAR_SUBQUERY_TOO_MANY_ROWS"),
+ // File not found
+ SparkError::FileNotFound { .. } => Some("_LEGACY_ERROR_TEMP_2055"),
+
// Generic errors (no error class)
SparkError::Arrow(_) | SparkError::Internal(_) => None,
}
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index 28c164571..44ebf7e36 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -19,11 +19,8 @@
package org.apache.comet
-import java.io.FileNotFoundException
import java.lang.management.ManagementFactory
-import scala.util.matching.Regex
-
import org.apache.hadoop.conf.Configuration
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
@@ -163,19 +160,9 @@ class CometExecIterator(
// threw the exception, so we log the exception with taskAttemptId here
logError(s"Native execution for task $taskAttemptId failed", e)
- val fileNotFoundPattern: Regex =
- ("""^External: Object at location (.+?) not found: No such file or
directory """ +
- """\(os error \d+\)$""").r
- val parquetError: Regex =
+ val parquetError: scala.util.matching.Regex =
"""^Parquet error: (?:.*)$""".r
e.getMessage match {
- case fileNotFoundPattern(filePath) =>
- // See
org.apache.spark.sql.errors.QueryExecutionErrors.readCurrentFileNotFoundError
- throw new SparkException(
- errorClass = "_LEGACY_ERROR_TEMP_2055",
- messageParameters = Map("message" -> e.getMessage),
- cause = new FileNotFoundException(filePath)
- ) // Can't use SparkFileNotFoundException because it's private.
case parquetError() =>
// See
org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError
// See org.apache.parquet.hadoop.ParquetFileReader for error
message.
diff --git
a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
index 8e6ed1a92..da65b1eb4 100644
---
a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
+++
b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
@@ -19,12 +19,20 @@
package org.apache.spark.sql.comet.shims
+import java.io.FileNotFoundException
+
+import scala.util.matching.Regex
+
import org.apache.spark.{QueryContext, SparkException}
import org.apache.spark.sql.catalyst.trees.SQLQueryContext
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+object ShimSparkErrorConverter {
+ val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r
+}
+
/**
* Spark 3.4 implementation for converting error types to proper Spark
exceptions.
*
@@ -243,6 +251,18 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors
.intervalArithmeticOverflowError("Interval arithmetic overflow",
"", sqlCtx(context)))
+ case "FileNotFound" =>
+ val msg = params("message").toString
+ // Extract file path from native error message and format like Hadoop's
+ // FileNotFoundException: "File <path> does not exist"
+ val path = ShimSparkErrorConverter.ObjectLocationPattern
+ .findFirstMatchIn(msg)
+ .map(_.group(1))
+ .getOrElse(msg)
+ Some(
+ QueryExecutionErrors.readCurrentFileNotFoundError(
+ new FileNotFoundException(s"File $path does not exist")))
+
case _ =>
None
}
diff --git
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
index 9bd8c7dba..ae21d1276 100644
---
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
+++
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
@@ -19,12 +19,20 @@
package org.apache.spark.sql.comet.shims
+import java.io.FileNotFoundException
+
+import scala.util.matching.Regex
+
import org.apache.spark.{QueryContext, SparkException}
import org.apache.spark.sql.catalyst.trees.SQLQueryContext
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+object ShimSparkErrorConverter {
+ val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r
+}
+
/**
* Spark 3.5 implementation for converting error types to proper Spark
exceptions.
*
@@ -239,6 +247,18 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors
.intervalArithmeticOverflowError("Interval arithmetic overflow",
"", sqlCtx(context)))
+ case "FileNotFound" =>
+ val msg = params("message").toString
+ // Extract file path from native error message and format like Hadoop's
+ // FileNotFoundException: "File <path> does not exist"
+ val path = ShimSparkErrorConverter.ObjectLocationPattern
+ .findFirstMatchIn(msg)
+ .map(_.group(1))
+ .getOrElse(msg)
+ Some(
+ QueryExecutionErrors.readCurrentFileNotFoundError(
+ new FileNotFoundException(s"File $path does not exist")))
+
case _ =>
None
}
diff --git
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
index e49c789a7..01d4eac4b 100644
---
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
+++
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
@@ -19,12 +19,19 @@
package org.apache.spark.sql.comet.shims
+import scala.util.matching.Regex
+
import org.apache.spark.QueryContext
import org.apache.spark.SparkException
+import org.apache.spark.SparkFileNotFoundException
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+object ShimSparkErrorConverter {
+ val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r
+}
+
/**
* Spark 4.0-specific implementation for converting error types to proper
Spark exceptions.
*/
@@ -251,6 +258,20 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError(
context.headOption.orNull))
+ case "FileNotFound" =>
+ val msg = params("message").toString
+ // Extract file path from native error message and format like Hadoop's
+ // FileNotFoundException: "File <path> does not exist"
+ val path = ShimSparkErrorConverter.ObjectLocationPattern
+ .findFirstMatchIn(msg)
+ .map(_.group(1))
+ .getOrElse(msg)
+ // readCurrentFileNotFoundError was removed in Spark 4.0; construct
directly
+ Some(
+ new SparkFileNotFoundException(
+ errorClass = "_LEGACY_ERROR_TEMP_2055",
+ messageParameters = Map("message" -> s"File $path does not
exist")))
+
case _ =>
// Unknown error type - return None to trigger fallback
None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]