This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d02fbba6491 [SPARK-46342][SQL] Replace `IllegalStateException` by
`SparkException.internalError` in sql
d02fbba6491 is described below
commit d02fbba6491fd17dc6bfc1a416971af7544952f3
Author: Max Gekk <[email protected]>
AuthorDate: Sun Dec 10 11:24:02 2023 -0800
[SPARK-46342][SQL] Replace `IllegalStateException` by
`SparkException.internalError` in sql
### What changes were proposed in this pull request?
In the PR, I propose to replace all `IllegalStateException` exception in
the `sql` project except of `streaming` by `SparkException.internalError`.
### Why are the changes needed?
This is a part of migration onto new error framework and error classes.
### Does this PR introduce _any_ user-facing change?
No, users shouldn't face to `IllegalStateException` in regular cases.
### How was this patch tested?
Using existing GAs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44275 from MaxGekk/replace-ise-by-internal-error.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/catalyst/util/TimestampFormatter.scala | 5 +++--
.../scala/org/apache/spark/sql/util/ArrowUtils.scala | 3 ++-
.../execution/datasources/v2/DataSourceV2Relation.scala | 2 +-
.../org/apache/spark/sql/util/ArrowUtilsSuite.scala | 12 +++++++-----
.../main/scala/org/apache/spark/sql/SparkSession.scala | 6 +++---
.../apache/spark/sql/api/python/PythonSQLUtils.scala | 3 ++-
.../sql/catalyst/analysis/ResolveSessionCatalog.scala | 3 ++-
.../spark/sql/execution/OptimizeMetadataOnlyQuery.scala | 3 ++-
.../org/apache/spark/sql/execution/SQLExecution.scala | 4 ++--
.../org/apache/spark/sql/execution/SparkSqlParser.scala | 3 ++-
.../apache/spark/sql/execution/SparkStrategies.scala | 17 +++++++++--------
.../spark/sql/execution/WholeStageCodegenExec.scala | 4 ++--
.../sql/execution/adaptive/AQEShuffleReadExec.scala | 8 ++++----
.../spark/sql/execution/adaptive/QueryStageExec.scala | 8 ++++----
.../execution/aggregate/AggregateCodegenSupport.scala | 3 ++-
.../sql/execution/aggregate/BaseAggregateExec.scala | 7 ++++---
.../execution/aggregate/ObjectAggregationIterator.scala | 4 ++--
.../aggregate/TungstenAggregationIterator.scala | 4 ++--
.../sql/execution/aggregate/UpdatingSessionsExec.scala | 3 ++-
.../execution/analysis/DetectAmbiguousSelfJoin.scala | 4 +++-
.../spark/sql/execution/basicPhysicalOperators.scala | 8 ++++----
.../spark/sql/execution/columnar/InMemoryRelation.scala | 4 ++--
.../columnar/compression/compressionSchemes.scala | 5 +++--
.../spark/sql/execution/datasources/DataSource.scala | 4 ++--
.../sql/execution/datasources/DataSourceUtils.scala | 6 +++---
.../sql/execution/datasources/jdbc/DriverRegistry.scala | 3 ++-
.../datasources/parquet/ParquetWriteSupport.scala | 4 ++--
.../execution/datasources/v2/DataSourceV2Strategy.scala | 6 ++++--
.../sql/execution/exchange/ShuffleExchangeExec.scala | 4 ++--
.../apache/spark/sql/execution/metric/SQLMetrics.scala | 4 ++--
.../spark/sql/execution/python/EvaluatePython.scala | 5 +++--
.../spark/sql/execution/python/ExtractPythonUDFs.scala | 7 ++++---
.../python/FlatMapGroupsInPandasWithStateExec.scala | 4 ++--
.../spark/sql/execution/window/AggregateProcessor.scala | 3 ++-
.../execution/window/WindowEvaluatorFactoryBase.scala | 9 +++++----
.../apache/spark/sql/expressions/ReduceAggregator.scala | 3 ++-
.../org/apache/spark/sql/SparkSessionBuilderSuite.scala | 2 +-
.../org/apache/spark/sql/execution/SparkPlanSuite.scala | 13 ++++++++-----
.../spark/sql/execution/WholeStageCodegenSuite.scala | 13 ++++++++-----
.../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 12 +++++++-----
.../spark/sql/expressions/ReduceAggregatorSuite.scala | 4 ++--
.../org/apache/spark/sql/hive/HiveInspectors.scala | 3 ++-
.../apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +-
.../apache/spark/sql/hive/execution/HiveTempPath.scala | 5 +++--
44 files changed, 136 insertions(+), 103 deletions(-)
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index df146e0dbfd..9539ced52dc 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -29,6 +29,7 @@ import scala.util.control.NonFatal
import org.apache.commons.lang3.time.FastDateFormat
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat,
LENIENT_SIMPLE_DATE_FORMAT}
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
@@ -90,7 +91,7 @@ sealed trait TimestampFormatter extends Serializable {
@throws(classOf[DateTimeException])
@throws(classOf[IllegalStateException])
def parseWithoutTimeZone(s: String, allowTimeZone: Boolean): Long =
- throw new IllegalStateException(
+ throw SparkException.internalError(
s"The method `parseWithoutTimeZone(s: String, allowTimeZone: Boolean)`
should be " +
"implemented in the formatter of timestamp without time zone")
@@ -137,7 +138,7 @@ sealed trait TimestampFormatter extends Serializable {
@throws(classOf[IllegalStateException])
def format(localDateTime: LocalDateTime): String =
- throw new IllegalStateException(
+ throw SparkException.internalError(
s"The method `format(localDateTime: LocalDateTime)` should be
implemented in the formatter " +
"of timestamp without time zone")
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
b/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
index 99c46c01785..f01014e1edb 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
@@ -26,6 +26,7 @@ import org.apache.arrow.vector.complex.MapVector
import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision,
IntervalUnit, TimeUnit}
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
+import org.apache.spark.SparkException
import org.apache.spark.sql.errors.ExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.util.ArrayImplicits._
@@ -53,7 +54,7 @@ private[sql] object ArrowUtils {
case DecimalType.Fixed(precision, scale) => new
ArrowType.Decimal(precision, scale)
case DateType => new ArrowType.Date(DateUnit.DAY)
case TimestampType if timeZoneId == null =>
- throw new IllegalStateException("Missing timezoneId where it is
mandatory.")
+ throw SparkException.internalError("Missing timezoneId where it is
mandatory.")
case TimestampType => new ArrowType.Timestamp(TimeUnit.MICROSECOND,
timeZoneId)
case TimestampNTZType =>
new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 573b0274e95..8dae9904bc8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -91,7 +91,7 @@ case class DataSourceV2Relation(
// when testing, throw an exception if this computeStats method is
called because stats should
// not be accessed before pushing the projection and filters to create a
scan. otherwise, the
// stats are not accurate because they are based on a full table scan of
all columns.
- throw new IllegalStateException(
+ throw SparkException.internalError(
s"BUG: computeStats called before pushdown on DSv2 relation: $name")
} else {
// when not testing, return stats because bad stats are better than
failing a query
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala
index 28ed061a71b..c0fa43ff9bd 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala
@@ -21,7 +21,7 @@ import java.time.ZoneId
import org.apache.arrow.vector.types.pojo.ArrowType
-import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
+import org.apache.spark.{SparkException, SparkFunSuite,
SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.LA
import org.apache.spark.sql.types._
@@ -50,10 +50,12 @@ class ArrowUtilsSuite extends SparkFunSuite {
roundtrip(DateType)
roundtrip(YearMonthIntervalType())
roundtrip(DayTimeIntervalType())
- val tsExMsg = intercept[IllegalStateException] {
- roundtrip(TimestampType)
- }
- assert(tsExMsg.getMessage.contains("timezoneId"))
+ checkError(
+ exception = intercept[SparkException] {
+ roundtrip(TimestampType)
+ },
+ errorClass = "INTERNAL_ERROR",
+ parameters = Map("message" -> "Missing timezoneId where it is
mandatory."))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
ArrowUtils.fromArrowType(new ArrowType.Int(8, false))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 9bc60f067dd..15eeca87dcf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -26,7 +26,7 @@ import scala.jdk.CollectionConverters._
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal
-import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext}
+import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext,
SparkException, TaskContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable,
Unstable}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
@@ -1217,7 +1217,7 @@ object SparkSession extends Logging {
*/
def active: SparkSession = {
getActiveSession.getOrElse(getDefaultSession.getOrElse(
- throw new IllegalStateException("No active or default Spark session
found")))
+ throw SparkException.internalError("No active or default Spark session
found")))
}
/**
@@ -1316,7 +1316,7 @@ object SparkSession extends Logging {
private def assertOnDriver(): Unit = {
if (TaskContext.get() != null) {
// we're accessing it during task execution, fail.
- throw new IllegalStateException(
+ throw SparkException.internalError(
"SparkSession should only be created and accessed on the driver.")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index 3f0e9369c61..62e6cc07b3e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -24,6 +24,7 @@ import java.util.Locale
import net.razorvine.pickle.{Pickler, Unpickler}
+import org.apache.spark.SparkException
import org.apache.spark.api.python.DechunkedInputStream
import org.apache.spark.internal.Logging
import org.apache.spark.security.SocketAuthServer
@@ -159,7 +160,7 @@ private[sql] object PythonSQLUtils extends Logging {
case "HOUR" => Column(zero.copy(hours = e.expr))
case "MINUTE" => Column(zero.copy(mins = e.expr))
case "SECOND" => Column(zero.copy(secs = e.expr))
- case _ => throw new IllegalStateException(s"Got the unexpected unit
'$unit'.")
+ case _ => throw SparkException.internalError(s"Got the unexpected unit
'$unit'.")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index d44de0b260b..c00a4331803 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.commons.lang3.StringUtils
+import org.apache.spark.SparkException
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec}
@@ -154,7 +155,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
throw QueryCompilationErrors.commandNotSupportNestedColumnError(
"DESC TABLE COLUMN", toPrettySQL(child))
case _ =>
- throw new IllegalStateException(s"[BUG] unexpected column
expression: $column")
+ throw SparkException.internalError(s"[BUG] unexpected column
expression: $column")
}
// For CREATE TABLE [AS SELECT], we should use the v1 command if the
catalog is resolved to the
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 00b1ec749d7..1de7a565f11 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import java.util.Locale
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation,
SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
@@ -150,7 +151,7 @@ case class OptimizeMetadataOnlyQuery(catalog:
SessionCatalog) extends Rule[Logic
LocalRelation(partAttrs, partitionData)
case _ =>
- throw new IllegalStateException(s"unrecognized table scan node:
$relation, " +
+ throw SparkException.internalError(s"unrecognized table scan node:
$relation, " +
s"please turn off ${SQLConf.OPTIMIZER_METADATA_ONLY.key} and try
again.")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index b96b9c25dda..e839d2c0691 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.util.control.NonFatal
-import org.apache.spark.{ErrorMessageFormat, SparkThrowable,
SparkThrowableHelper}
+import org.apache.spark.{ErrorMessageFormat, SparkException, SparkThrowable,
SparkThrowableHelper}
import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION,
SPARK_JOB_INTERRUPT_ON_CANCEL}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX,
SPARK_EXECUTOR_PREFIX}
@@ -58,7 +58,7 @@ object SQLExecution extends Logging {
// started execution of a query didn't call withNewExecutionId. The
execution ID should be
// set by calling withNewExecutionId in the action that begins
execution, like
// Dataset.collect or DataFrameWriter.insertInto.
- throw new IllegalStateException("Execution ID should be set")
+ throw SparkException.internalError("Execution ID should be set")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index d8e5d4f2270..2ef68887e87 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView,
PersistedView, UnresolvedFunctionName, UnresolvedIdentifier}
import org.apache.spark.sql.catalyst.catalog._
@@ -269,7 +270,7 @@ class SparkSqlAstBuilder extends AstBuilder {
} else if (ctx.stringLit() != null) {
SetCatalogCommand(string(visitStringLit(ctx.stringLit())))
} else {
- throw new IllegalStateException("Invalid catalog name")
+ throw SparkException.internalError("Invalid catalog name")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index df770bd5eee..304ce0cd751 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import java.util.Locale
+import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{execution, AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
@@ -552,7 +553,7 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
if (distinctAggChildSets.length > 1) {
// This is a sanity check. We should not reach here when we have
multiple distinct
// column sets. Our `RewriteDistinctAggregates` should take care
this case.
- throw new IllegalStateException(
+ throw SparkException.internalError(
"You hit a query analyzer bug. Please report your query to Spark
user mailing list.")
}
@@ -782,27 +783,27 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
LocalTableScanExec(output, sink.allData.map(r => toRow(r).copy())) ::
Nil
case logical.Distinct(child) =>
- throw new IllegalStateException(
+ throw SparkException.internalError(
"logical distinct operator should have been replaced by aggregate in
the optimizer")
case logical.Intersect(left, right, false) =>
- throw new IllegalStateException(
+ throw SparkException.internalError(
"logical intersect operator should have been replaced by semi-join
in the optimizer")
case logical.Intersect(left, right, true) =>
- throw new IllegalStateException(
+ throw SparkException.internalError(
"logical intersect operator should have been replaced by union,
aggregate" +
" and generate operators in the optimizer")
case logical.Except(left, right, false) =>
- throw new IllegalStateException(
+ throw SparkException.internalError(
"logical except operator should have been replaced by anti-join in
the optimizer")
case logical.Except(left, right, true) =>
- throw new IllegalStateException(
+ throw SparkException.internalError(
"logical except (all) operator should have been replaced by union,
aggregate" +
" and generate operators in the optimizer")
case logical.ResolvedHint(child, hints) =>
- throw new IllegalStateException(
+ throw SparkException.internalError(
"ResolvedHint operator should have been replaced by join hint in the
optimizer")
case Deduplicate(_, child) if !child.isStreaming =>
- throw new IllegalStateException(
+ throw SparkException.internalError(
"Deduplicate operator for non streaming data source should have been
replaced " +
"by aggregate in the optimizer")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 0aefb0649d0..058df24fc13 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import scala.collection.mutable
import scala.util.control.NonFatal
-import org.apache.spark.broadcast
+import org.apache.spark.{broadcast, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -402,7 +402,7 @@ trait CodegenSupport extends SparkPlan {
val errMsg = "Only leaf nodes and blocking nodes need to call
'limitNotReachedCond' " +
"in its data producing loop."
if (Utils.isTesting) {
- throw new IllegalStateException(errMsg)
+ throw SparkException.internalError(errMsg)
} else {
logWarning(s"[BUG] $errMsg Please open a JIRA ticket to report it.")
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
index 6b39ac70a62..12e8d0e2c60 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
@@ -69,7 +69,7 @@ case class AQEShuffleReadExec private(
case other => other
}
case _ =>
- throw new IllegalStateException("operating on canonicalization plan")
+ throw SparkException.internalError("operating on canonicalization
plan")
}
} else if (isCoalescedRead) {
// For coalesced shuffle read, the data distribution is not changed,
only the number of
@@ -90,7 +90,7 @@ case class AQEShuffleReadExec private(
case r: RoundRobinPartitioning =>
r.copy(numPartitions = partitionSpecs.length)
case other @ SinglePartition =>
- throw new IllegalStateException(
+ throw SparkException.internalError(
"Unexpected partitioning for coalesced shuffle read: " + other)
case _ =>
// Spark plugins may have custom partitioning and may replace this
operator
@@ -163,7 +163,7 @@ case class AQEShuffleReadExec private(
assert(p.dataSize.isDefined)
p.dataSize.get
case p: PartialReducerPartitionSpec => p.dataSize
- case p => throw new IllegalStateException(s"unexpected $p")
+ case p => throw SparkException.internalError(s"unexpected $p")
})
} else {
None
@@ -253,7 +253,7 @@ case class AQEShuffleReadExec private(
sendDriverMetrics()
stage.shuffle.getShuffleRDD(partitionSpecs.toArray)
case _ =>
- throw new IllegalStateException("operating on canonicalized plan")
+ throw SparkException.internalError("operating on canonicalized plan")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index b941feb12fc..89e9de8b084 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.Future
-import org.apache.spark.{FutureAction, MapOutputStatistics}
+import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -179,7 +179,7 @@ case class ShuffleQueryStageExec(
case s: ShuffleExchangeLike => s
case ReusedExchangeExec(_, s: ShuffleExchangeLike) => s
case _ =>
- throw new IllegalStateException(s"wrong plan for shuffle stage:\n
${plan.treeString}")
+ throw SparkException.internalError(s"wrong plan for shuffle stage:\n
${plan.treeString}")
}
def advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize
@@ -233,7 +233,7 @@ case class BroadcastQueryStageExec(
case b: BroadcastExchangeLike => b
case ReusedExchangeExec(_, b: BroadcastExchangeLike) => b
case _ =>
- throw new IllegalStateException(s"wrong plan for broadcast stage:\n
${plan.treeString}")
+ throw SparkException.internalError(s"wrong plan for broadcast stage:\n
${plan.treeString}")
}
override protected def doMaterialize(): Future[Any] = {
@@ -273,7 +273,7 @@ case class TableCacheQueryStageExec(
@transient val inMemoryTableScan = plan match {
case i: InMemoryTableScanExec => i
case _ =>
- throw new IllegalStateException(s"wrong plan for table cache stage:\n
${plan.treeString}")
+ throw SparkException.internalError(s"wrong plan for table cache stage:\n
${plan.treeString}")
}
@transient
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala
index 1377a984223..9523bf1a1c0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.aggregate
+import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ExpressionEquals, UnsafeRow}
@@ -343,7 +344,7 @@ trait AggregateCodegenSupport
"length of at least one split function went over the JVM limit: " +
CodeGenerator.MAX_JVM_METHOD_PARAMS_LENGTH
if (Utils.isTesting) {
- throw new IllegalStateException(errMsg)
+ throw SparkException.internalError(errMsg)
} else {
logInfo(errMsg)
None
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
index 2427a39751f..5391d580759 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.aggregate
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSet, Expression, NamedExpression}
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
Final, PartialMerge}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples,
ClusteredDistribution, Distribution, UnspecifiedDistribution}
@@ -102,9 +103,9 @@ trait BaseAggregateExec extends UnaryExecNode with
PartitioningPreservingUnaryEx
StatefulOperatorPartitioning.getCompatibleDistribution(
exprs, parts, conf) :: Nil
- case _ =>
- throw new IllegalStateException("Expected to set the number of
partitions before " +
- "constructing required child distribution!")
+ case _ => throw SparkException.internalError(
+ "Expected to set the number of partitions before " +
+ "constructing required child distribution!")
}
} else {
ClusteredDistribution(exprs) :: Nil
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
index 4cc251a99db..57b8fd8570f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.aggregate
-import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -109,7 +109,7 @@ class ObjectAggregationIterator(
val defaultAggregationBuffer = createNewAggregationBuffer()
generateOutput(UnsafeRow.createFromByteArray(0, 0),
defaultAggregationBuffer)
} else {
- throw new IllegalStateException(
+ throw SparkException.internalError(
"This method should not be called when groupingExpressions is not
empty.")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index db567dcd15b..1ebf0d143bd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.aggregate
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.memory.SparkOutOfMemoryError
import org.apache.spark.sql.catalyst.InternalRow
@@ -461,7 +461,7 @@ class TungstenAggregationIterator(
hashMap.free()
resultCopy
} else {
- throw new IllegalStateException(
+ throw SparkException.internalError(
"This method should not be called when groupingExpressions is not
empty.")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
index fee7e29f8ad..b5dfd4639d8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.aggregate
+import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
SortOrder}
@@ -73,7 +74,7 @@ case class UpdatingSessionsExec(
groupingWithoutSessionExpression, parts, conf) :: Nil
case _ =>
- throw new IllegalStateException("Expected to set the number of
partitions before " +
+ throw SparkException.internalError("Expected to set the number of
partitions before " +
"constructing required child distribution!")
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
index 7e9628c3851..c2925a3ba59 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.analysis
import scala.collection.mutable
+import org.apache.spark.SparkException
import org.apache.spark.sql.{Column, Dataset}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, Cast, Equality, Expression, ExprId}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
@@ -95,7 +96,8 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] {
colRefs.foreach { ref =>
if (ids.contains(ref.datasetId)) {
if (ref.colPos < 0 || ref.colPos >= p.output.length) {
- throw new IllegalStateException("[BUG] Hit an invalid Dataset
column reference: " +
+ throw SparkException.internalError(
+ "Hit an invalid Dataset column reference: " +
s"$ref. Please open a JIRA ticket to report it.")
} else {
// When self-join happens, the analyzer asks the right side
plan to generate
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 2fd79935507..083858e4fe8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
-import org.apache.spark.{InterruptibleIterator, Partition, SparkContext,
TaskContext}
+import org.apache.spark.{InterruptibleIterator, Partition, SparkContext,
SparkException, TaskContext}
import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -866,15 +866,15 @@ case class SubqueryExec(name: String, child: SparkPlan,
maxNumRows: Option[Int]
}
protected override def doExecute(): RDD[InternalRow] = {
- throw new IllegalStateException("SubqueryExec.doExecute should never be
called")
+ throw SparkException.internalError("SubqueryExec.doExecute should never be
called")
}
override def executeTake(n: Int): Array[InternalRow] = {
- throw new IllegalStateException("SubqueryExec.executeTake should never be
called")
+ throw SparkException.internalError("SubqueryExec.executeTake should never
be called")
}
override def executeTail(n: Int): Array[InternalRow] = {
- throw new IllegalStateException("SubqueryExec.executeTail should never be
called")
+ throw SparkException.internalError("SubqueryExec.executeTail should never
be called")
}
override def stringArgs: Iterator[Any] = Iterator(name, child) ++
Iterator(s"[id=#$id]")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 750f49c25b6..af958208afd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
import org.apache.commons.lang3.StringUtils
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -61,7 +61,7 @@ class DefaultCachedBatchSerializer extends
SimpleMetricsCachedBatchSerializer {
schema: Seq[Attribute],
storageLevel: StorageLevel,
conf: SQLConf): RDD[CachedBatch] =
- throw new IllegalStateException("Columnar input is not supported")
+ throw SparkException.internalError("Columnar input is not supported")
override def convertInternalRowToCachedBatch(
input: RDD[InternalRow],
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
index 453bc47c19f..46044f6919d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
@@ -22,6 +22,7 @@ import java.nio.ByteOrder
import scala.collection.mutable
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.types.{PhysicalBooleanType,
PhysicalByteType, PhysicalDataType, PhysicalDoubleType, PhysicalFloatType,
PhysicalIntegerType, PhysicalLongType, PhysicalShortType, PhysicalStringType}
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -350,7 +351,7 @@ private[columnar] case object RunLengthEncoding extends
CompressionScheme {
decompress0(columnVector, capacity, getInt, putInt)
case _: PhysicalLongType =>
decompress0(columnVector, capacity, getLong, putLong)
- case _ => throw new IllegalStateException("Not supported type in
RunLengthEncoding.")
+ case _ => throw SparkException.internalError("Not supported type in
RunLengthEncoding.")
}
}
}
@@ -520,7 +521,7 @@ private[columnar] case object DictionaryEncoding extends
CompressionScheme {
}
pos += 1
}
- case _ => throw new IllegalStateException("Not supported type in
DictionaryEncoding.")
+ case _ => throw SparkException.internalError("Not supported type in
DictionaryEncoding.")
}
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 668d2538e03..71b6d4b886b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -513,7 +513,7 @@ case class DataSource(
qe.assertCommandExecuted()
// Replace the schema with that of the DataFrame we just wrote out to
avoid re-inferring
copy(userSpecifiedSchema =
Some(outputColumns.toStructType.asNullable)).resolveRelation()
- case _ => throw new IllegalStateException(
+ case _ => throw SparkException.internalError(
s"${providingClass.getCanonicalName} does not allow create table as
select.")
}
}
@@ -531,7 +531,7 @@ case class DataSource(
disallowWritingIntervals(data.schema.map(_.dataType),
forbidAnsiIntervals = false)
DataSource.validateSchema(data.schema, sparkSession.sessionState.conf)
planForWritingFileFormat(format, mode, data)
- case _ => throw new IllegalStateException(
+ case _ => throw SparkException.internalError(
s"${providingClass.getCanonicalName} does not allow create table as
select.")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index a5c1f6613b4..cf02826baf3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
import org.json4s.{Formats, NoTypeHints}
import org.json4s.jackson.Serialization
-import org.apache.spark.SparkUpgradeException
+import org.apache.spark.{SparkException, SparkUpgradeException}
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME_METADATA_KEY,
SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY,
SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, Expression, ExpressionSet, PredicateHelper}
@@ -172,7 +172,7 @@ object DataSourceUtils extends PredicateHelper {
(SQLConf.PARQUET_REBASE_MODE_IN_READ.key,
ParquetOptions.DATETIME_REBASE_MODE)
case "Avro" =>
(SQLConf.AVRO_REBASE_MODE_IN_READ.key, "datetimeRebaseMode")
- case _ => throw new IllegalStateException(s"Unrecognized format
$format.")
+ case _ => throw SparkException.internalError(s"Unrecognized format
$format.")
}
QueryExecutionErrors.sparkUpgradeInReadingDatesError(format, config,
option)
}
@@ -182,7 +182,7 @@ object DataSourceUtils extends PredicateHelper {
case "Parquet INT96" => SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key
case "Parquet" => SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key
case "Avro" => SQLConf.AVRO_REBASE_MODE_IN_WRITE.key
- case _ => throw new IllegalStateException(s"Unrecognized format
$format.")
+ case _ => throw SparkException.internalError(s"Unrecognized format
$format.")
}
QueryExecutionErrors.sparkUpgradeInWritingDatesError(format, config)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
index 421fa4ddace..dd429778e1f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
@@ -22,6 +22,7 @@ import java.sql.{Driver, DriverManager}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
+import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
@@ -65,7 +66,7 @@ object DriverRegistry extends Logging {
case d: DriverWrapper if d.wrapped.getClass.getCanonicalName ==
className => d.wrapped
case d if d.getClass.getCanonicalName == className => d
}.getOrElse {
- throw new IllegalStateException(
+ throw SparkException.internalError(
s"Did not find registered driver with class $className")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index e410789504e..7194033e603 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.io.api.{Binary, RecordConsumer}
-import org.apache.spark.SPARK_VERSION_SHORT
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME_METADATA_KEY,
SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY,
SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.InternalRow
@@ -263,7 +263,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow]
with Logging {
case t: UserDefinedType[_] => makeWriter(t.sqlType)
- case _ => throw new IllegalStateException(s"Unsupported data type
$dataType.")
+ case _ => throw SparkException.internalError(s"Unsupported data type
$dataType.")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 16889b247f2..fe3140c8030 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -629,7 +629,8 @@ private[sql] object DataSourceV2Strategy extends Logging {
expressions.Not(rebuildExpressionFromFilter(not.child(),
translatedFilterToExpr))
case _ =>
translatedFilterToExpr.getOrElse(predicate,
- throw new IllegalStateException("Failed to rebuild Expression for
filter: " + predicate))
+ throw SparkException.internalError(
+ "Failed to rebuild Expression for filter: " + predicate))
}
}
@@ -642,7 +643,8 @@ private[sql] object DataSourceV2Strategy extends Logging {
protected[sql] def translateRuntimeFilterV2(expr: Expression):
Option[Predicate] = expr match {
case in @ InSubqueryExec(PushableColumnAndNestedColumn(name), _, _, _, _,
_) =>
val values = in.values().getOrElse {
- throw new IllegalStateException(s"Can't translate $in to v2 Predicate,
no subquery result")
+ throw SparkException.internalError(
+ s"Can't translate $in to v2 Predicate, no subquery result")
}
val literals = values.map(LiteralValue(_, in.child.dataType))
Some(new Predicate("IN", FieldReference(name) +: literals))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 509f1e6a1e4..69705afbb7c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -306,7 +306,7 @@ object ShuffleExchangeExec {
case (partition, index) =>
(partition.toSeq(expressions.map(_.dataType)), index)
}.toMap
new KeyGroupedPartitioner(mutable.Map(valueMap.toSeq: _*), n)
- case _ => throw new IllegalStateException(s"Exchange not implemented for
$newPartitioning")
+ case _ => throw SparkException.internalError(s"Exchange not implemented
for $newPartitioning")
// TODO: Handle BroadcastPartitioning.
}
def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match
{
@@ -334,7 +334,7 @@ object ShuffleExchangeExec {
case SinglePartition => identity
case KeyGroupedPartitioning(expressions, _, _, _) =>
row => bindReferences(expressions, outputAttributes).map(_.eval(row))
- case _ => throw new IllegalStateException(s"Exchange not implemented for
$newPartitioning")
+ case _ => throw SparkException.internalError(s"Exchange not implemented
for $newPartitioning")
}
val isRoundRobin = newPartitioning.isInstanceOf[RoundRobinPartitioning] &&
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 69d288ae75c..f0e58766dc6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -24,7 +24,7 @@ import scala.concurrent.duration._
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -230,7 +230,7 @@ object SQLMetrics {
} else if (metricsType == NS_TIMING_METRIC) {
duration => Utils.msDurationToString(duration.nanos.toMillis)
} else {
- throw new IllegalStateException(s"unexpected metrics type:
$metricsType")
+ throw SparkException.internalError(s"unexpected metrics type:
$metricsType")
}
val validValues = values.filter(_ >= 0)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index 48db2560da9..3b18e843c6f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -24,6 +24,7 @@ import scala.jdk.CollectionConverters._
import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
+import org.apache.spark.SparkException
import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -182,9 +183,9 @@ object EvaluatePython {
case c if c.getClass.isArray =>
val array = c.asInstanceOf[Array[_]]
if (array.length != fields.length) {
- throw new IllegalStateException(
+ throw SparkException.internalError(
s"Input row doesn't have expected number of values required by
the schema. " +
- s"${fields.length} fields are required while ${array.length}
values are provided."
+ s"${fields.length} fields are required while ${array.length}
values are provided."
)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index 711da3ff3f1..dcd6603f649 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.python
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.SparkException
import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -262,7 +263,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] {
val evalTypes = validUdfs.map(_.evalType).toSet
if (evalTypes.size != 1) {
- throw new IllegalStateException(
+ throw SparkException.internalError(
"Expected udfs have the same evalType but got different
evalTypes: " +
evalTypes.mkString(","))
}
@@ -274,7 +275,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] {
| PythonEvalType.SQL_ARROW_BATCHED_UDF =>
ArrowEvalPython(validUdfs, resultAttrs, child, evalType)
case _ =>
- throw new IllegalStateException("Unexpected UDF evalType")
+ throw SparkException.internalError("Unexpected UDF evalType")
}
attributeMap ++=
validUdfs.map(canonicalizeDeterministic).zip(resultAttrs)
@@ -286,7 +287,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] {
// Other cases are disallowed as they are ambiguous or would require a
cartesian
// product.
udfs.map(canonicalizeDeterministic).filterNot(attributeMap.contains).foreach {
udf =>
- throw new IllegalStateException(
+ throw SparkException.internalError(
s"Invalid PythonUDF $udf, requires attributes from more than one
child.")
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
index 32d010b00d0..105c5ca6493 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.python
-import org.apache.spark.{JobArtifactSet, TaskContext}
+import org.apache.spark.{JobArtifactSet, SparkException, TaskContext}
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
@@ -145,7 +145,7 @@ case class FlatMapGroupsInPandasWithStateExec(
case ProcessingTimeTimeout => batchTimestampMs.get
case EventTimeTimeout => eventTimeWatermarkForEviction.get
case _ =>
- throw new IllegalStateException(
+ throw SparkException.internalError(
s"Cannot filter timed out keys for $timeoutConf")
}
val timingOutPairs = stateManager.getAllState(store).filter { state =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
index 68ea33572c8..60ce6b06890 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.window
import scala.collection.mutable
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -91,7 +92,7 @@ private[window] object AggregateProcessor {
updateExpressions ++= noOps
evaluateExpressions += imperative
case other =>
- throw new IllegalStateException(s"Unsupported aggregate function:
$other")
+ throw SparkException.internalError(s"Unsupported aggregate function:
$other")
}
// Create the projections.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
index 4491861dd9d..bdaccd43c1b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.window
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Add,
AggregateWindowFunction, Ascending, Attribute, BoundReference, CurrentRow,
DateAdd, DateAddYMInterval, DecimalAddNoOverflowCheck, Descending, Expression,
FrameLessOffsetWindowFunction, FrameType, IdentityProjection, IntegerLiteral,
MutableProjection, NamedExpression, OffsetWindowFunction, PythonFuncExpression,
RangeFrame, RowFrame, RowOrdering, SortOrder, SpecifiedWindowFrame, TimeAdd,
TimestampAddYMInterval, UnaryMinus, UnboundedFol [...]
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -76,7 +77,7 @@ trait WindowEvaluatorFactoryBase {
RowBoundOrdering(offset)
case (RowFrame, _) =>
- throw new IllegalStateException(s"Unhandled bound in windows
expressions: $bound")
+ throw SparkException.internalError(s"Unhandled bound in windows
expressions: $bound")
case (RangeFrame, CurrentRow) =>
val ordering = RowOrdering.create(orderSpec, childOutput)
@@ -119,7 +120,7 @@ trait WindowEvaluatorFactoryBase {
RangeBoundOrdering(ordering, current, bound)
case (RangeFrame, _) =>
- throw new IllegalStateException("Non-Zero range offsets are not
supported for windows " +
+ throw SparkException.internalError("Non-Zero range offsets are not
supported for windows " +
"with multiple order expressions.")
}
}
@@ -168,7 +169,7 @@ trait WindowEvaluatorFactoryBase {
case _ => collect("AGGREGATE", frame, e, f)
}
case f: AggregateWindowFunction => collect("AGGREGATE", frame, e,
f)
- case f => throw new IllegalStateException(s"Unsupported window
function: $f")
+ case f => throw SparkException.internalError(s"Unsupported window
function: $f")
}
case _ =>
}
@@ -275,7 +276,7 @@ trait WindowEvaluatorFactoryBase {
}
case _ =>
- throw new IllegalStateException(s"Unsupported factory: $key")
+ throw SparkException.internalError(s"Unsupported factory: $key")
}
// Keep track of the number of expressions. This is a side-effect in a
map...
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
index e897fdfe008..fd3df372a2d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.expressions
+import org.apache.spark.SparkException
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -72,7 +73,7 @@ private[sql] class ReduceAggregator[T: Encoder](func: (T, T)
=> T)
override def finish(reduction: (Boolean, T)): T = {
if (!reduction._1) {
- throw new IllegalStateException("ReduceAggregator requires at least one
input row")
+ throw SparkException.internalError("ReduceAggregator requires at least
one input row")
}
reduction._2
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index 90082c92291..4ac05373e5a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -104,7 +104,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with
Eventually {
SparkSession.clearActiveSession()
assert(SparkSession.active == session)
SparkSession.clearDefaultSession()
- intercept[IllegalStateException](SparkSession.active)
+ intercept[SparkException](SparkSession.active)
session.stop()
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index b14f4a405f6..058719f265d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -108,11 +108,14 @@ class SparkPlanSuite extends QueryTest with
SharedSparkSession {
val df = spark.range(10)
val planner = spark.sessionState.planner
val deduplicate = Deduplicate(df.queryExecution.analyzed.output,
df.queryExecution.analyzed)
- val err = intercept[IllegalStateException] {
- planner.plan(deduplicate)
- }
- assert(err.getMessage.contains("Deduplicate operator for non streaming
data source " +
- "should have been replaced by aggregate in the optimizer"))
+ checkError(
+ exception = intercept[SparkException] {
+ planner.plan(deduplicate)
+ },
+ errorClass = "INTERNAL_ERROR",
+ parameters = Map(
+ "message" -> ("Deduplicate operator for non streaming data source
should have been " +
+ "replaced by aggregate in the optimizer")))
}
test("SPARK-37221: The collect-like API in SparkPlan should support columnar
output") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index faefc240b79..18c6113416a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -857,16 +857,19 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
SQLConf.CODEGEN_METHOD_SPLIT_THRESHOLD.key -> "1",
"spark.sql.CodeGenerator.validParamLength" -> "0") {
withTable("t") {
- val expectedErrMsg = "Failed to split aggregate code into small
functions"
+ val expectedErrMsg = "Failed to split aggregate code into small
functions.*"
Seq(
// Test case without keys
"SELECT AVG(v) FROM VALUES(1) t(v)",
// Tet case with keys
"SELECT k, AVG(v) FROM VALUES((1, 1)) t(k, v) GROUP BY k").foreach {
query =>
- val e = intercept[IllegalStateException] {
- sql(query).collect()
- }
- assert(e.getMessage.contains(expectedErrMsg))
+ checkError(
+ exception = intercept[SparkException] {
+ sql(query).collect()
+ },
+ errorClass = "INTERNAL_ERROR",
+ parameters = Map("message" -> expectedErrMsg),
+ matchPVals = true)
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index a76360439e6..a67b1b69f6b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1014,11 +1014,13 @@ class AdaptiveQueryExecSuite
val read = reads.head
val c = read.canonicalized.asInstanceOf[AQEShuffleReadExec]
// we can't just call execute() because that has separate checks for
canonicalized plans
- val ex = intercept[IllegalStateException] {
- val doExecute = PrivateMethod[Unit](Symbol("doExecute"))
- c.invokePrivate(doExecute())
- }
- assert(ex.getMessage === "operating on canonicalized plan")
+ checkError(
+ exception = intercept[SparkException] {
+ val doExecute = PrivateMethod[Unit](Symbol("doExecute"))
+ c.invokePrivate(doExecute())
+ },
+ errorClass = "INTERNAL_ERROR",
+ parameters = Map("message" -> "operating on canonicalized plan"))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala
index c1071373287..10122c041c4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.expressions
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -72,7 +72,7 @@ class ReduceAggregatorSuite extends SparkFunSuite {
val func = (v1: Int, v2: Int) => v1 + v2
val aggregator: ReduceAggregator[Int] = new
ReduceAggregator(func)(Encoders.scalaInt)
- intercept[IllegalStateException] {
+ intercept[SparkException] {
aggregator.finish(aggregator.zero)
}
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 849f6f15189..ba87ad37130 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -29,6 +29,7 @@ import
org.apache.hadoop.hive.serde2.objectinspector.{StructField => HiveStructF
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.hive.serde2.typeinfo.{DecimalTypeInfo,
TypeInfoFactory}
+import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -916,7 +917,7 @@ private[hive] trait HiveInspectors {
toInspector(dt.sqlType)
// We will enumerate all of the possible constant expressions, throw
exception if we missed
case Literal(_, dt) =>
- throw new IllegalStateException(s"Hive doesn't support the constant type
[$dt].")
+ throw SparkException.internalError(s"Hive doesn't support the constant
type [$dt].")
// ideally, we don't test the foldable here(but in optimizer), however,
some of the
// Hive UDF / UDAF requires its argument to be constant objectinspector,
we do it eagerly.
case _ if expr.foldable => toInspector(Literal.create(expr.eval(),
expr.dataType))
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 3b9e2733352..72388a8d4b9 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -842,7 +842,7 @@ private[hive] class HiveClientImpl(
val maxResults = 100000
val results = runHive(sql, maxResults)
// It is very confusing when you only get back some of the results...
- if (results.size == maxResults) throw new IllegalStateException("RESULTS
POSSIBLY TRUNCATED")
+ if (results.size == maxResults) throw
SparkException.internalError("RESULTS POSSIBLY TRUNCATED")
results
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
index 6fd8892fa1f..b3b9ebea21c 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
+import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -54,7 +55,7 @@ class HiveTempPath(session: SparkSession, val hadoopConf:
Configuration, path: P
if (allSupportedHiveVersions.contains(hiveVersion)) {
externalTempPath(path, stagingDir)
} else {
- throw new IllegalStateException("Unsupported hive version: " +
hiveVersion.fullVersion)
+ throw SparkException.internalError("Unsupported hive version: " +
hiveVersion.fullVersion)
}
}
@@ -143,7 +144,7 @@ class HiveTempPath(session: SparkSession, val hadoopConf:
Configuration, path: P
stagingDirForCreating.foreach { stagingDir =>
val fs: FileSystem = stagingDir.getFileSystem(hadoopConf)
if (!FileUtils.mkdir(fs, stagingDir, true, hadoopConf)) {
- throw new IllegalStateException(
+ throw SparkException.internalError(
"Cannot create staging directory '" + stagingDir.toString + "'")
}
fs.deleteOnExit(stagingDir)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]