This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 03eb318f7119 [SPARK-52615][CORE] Replace File.mkdirs with
Utils.createDirectory
03eb318f7119 is described below
commit 03eb318f7119ff33f2f4802caca2a9fb382eb0e5
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Jul 3 14:44:12 2025 +0800
[SPARK-52615][CORE] Replace File.mkdirs with Utils.createDirectory
### What changes were proposed in this pull request?
We hit an issue that `File.mkdirs()` may occasionally fail with no error
during the submission phase(we didn't configure `spark.yarn.archive` in that
cluster so each submission requires packaging and uploading spark client jars,
which cosumes a lot of disk IO), which was also mentioned in
`Utils.createDirectory`
> // SPARK-35907: The check was required by File.mkdirs() because it could
sporadically
> // fail silently. ...
```
25/06/27 19:12:17 WARN Client: Neither spark.yarn.jars nor
spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
25/06/27 19:12:17 ERROR Utils: Failed to create dir in ./local. Ignoring
this directory.
25/06/27 19:12:17 INFO Client: Deleted staging directory
hdfs://<cluster>/user/<user>/.sparkStaging/application_1747844918192_28291290
Exception in thread "main" java.io.IOException: Failed to get a temp
directory under [./local].
at org.apache.spark.util.Utils$.getLocalDir(Utils.scala:896)
at
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:672)
at
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:1005)
at
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:231)
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1352)
at
org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1800)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1019)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1107)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1116)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```
So I replaced `File.mkdirs` with `Utils.createDirectory` and deployed it
into our internal cluster, no similar failures happens after then ... (not sure
why, maybe the replaced NIO method is more robust?)
Additional context:
[JDK-4227544](https://bugs.openjdk.org/browse/JDK-4227544) "design bug:
File.mkdir(), etc. don't provide reason for failure" get closed with "Won't Fix"
> It is too late to change this now. The new io framework will handle this
situation better.
### Why are the changes needed?
To achieve better error message reporting when creating a directory fails.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
I made such a change in internal Spark, and deployed it to a busy YARN
cluster, the submit process has been stable so far.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51322 from pan3793/SPARK-52615.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../spark/network/shuffle/TestShuffleDataContext.java | 3 ++-
.../scala/org/apache/spark/util/IvyTestUtils.scala | 12 ++++++------
.../scala/org/apache/spark/sql/avro/AvroSuite.scala | 2 +-
.../spark/streaming/kafka010/KafkaRDDSuite.scala | 3 ++-
.../scala/org/apache/spark/deploy/RPackageUtils.scala | 2 +-
.../org/apache/spark/deploy/worker/DriverRunner.scala | 2 +-
.../scala/org/apache/spark/deploy/worker/Worker.scala | 2 +-
.../src/main/scala/org/apache/spark/rdd/PipedRDD.scala | 2 +-
.../org/apache/spark/storage/DiskBlockManager.scala | 2 +-
core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++--
.../scala/org/apache/spark/SparkContextSuite.scala | 2 +-
.../org/apache/spark/benchmark/BenchmarkBase.scala | 3 ++-
.../org/apache/spark/deploy/RPackageUtilsSuite.scala | 4 ++--
.../org/apache/spark/deploy/SparkSubmitSuite.scala | 2 +-
.../spark/deploy/history/HistoryServerSuite.scala | 2 +-
.../org/apache/spark/deploy/worker/WorkerSuite.scala | 2 +-
.../apache/spark/scheduler/TaskResultGetterSuite.scala | 2 +-
.../org/apache/spark/storage/LocalDirsSuite.scala | 2 +-
.../spark/network/yarn/YarnShuffleServiceSuite.scala | 6 +++---
.../streaming/TransformWithStateConnectSuite.scala | 3 ++-
.../execution/streaming/state/RocksDBFileManager.scala | 4 ++--
.../org/apache/spark/sql/ExpressionsSchemaSuite.scala | 3 ++-
.../org/apache/spark/sql/ICUCollationsMapSuite.scala | 3 ++-
.../org/apache/spark/sql/PlanStabilitySuite.scala | 3 ++-
.../scala/org/apache/spark/sql/SQLQueryTestSuite.scala | 2 +-
.../org/apache/spark/sql/TPCDSQueryTestSuite.scala | 3 ++-
.../datasources/FileSourceStrategySuite.scala | 4 ++--
.../sql/execution/datasources/json/JsonSuite.scala | 2 +-
.../datasources/orc/OrcPartitionDiscoverySuite.scala | 3 ++-
.../execution/datasources/parquet/ParquetTest.scala | 3 ++-
.../spark/sql/execution/python/RowQueueSuite.scala | 2 +-
.../spark/sql/sources/PartitionedWriteSuite.scala | 4 ++--
.../spark/sql/streaming/FileStreamSourceSuite.scala | 18 +++++++++---------
.../streaming/test/DataStreamReaderWriterSuite.scala | 6 +++---
.../hive/service/cli/session/HiveSessionImpl.java | 3 ++-
.../hive/service/cli/session/SessionManager.java | 3 ++-
.../ui/HiveThriftServer2ListenerSuite.scala | 3 ++-
.../hive/thriftserver/ui/ThriftServerPageSuite.scala | 3 ++-
.../sql/hive/HiveExternalCatalogVersionsSuite.scala | 2 +-
.../org/apache/spark/sql/hive/StatisticsSuite.scala | 8 ++++----
40 files changed, 79 insertions(+), 65 deletions(-)
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 49b17824c3c7..4b8dc33c6bf5 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.file.Files;
import com.google.common.io.Closeables;
@@ -54,7 +55,7 @@ public class TestShuffleDataContext {
localDirs[i] = JavaUtils.createDirectory(root,
"spark").getAbsolutePath();
for (int p = 0; p < subDirsPerLocalDir; p ++) {
- new File(localDirs[i], String.format("%02x", p)).mkdirs();
+ Files.createDirectories(new File(localDirs[i], String.format("%02x",
p)).toPath());
}
}
}
diff --git
a/common/utils/src/test/scala/org/apache/spark/util/IvyTestUtils.scala
b/common/utils/src/test/scala/org/apache/spark/util/IvyTestUtils.scala
index 140de836622f..5120a229fe18 100644
--- a/common/utils/src/test/scala/org/apache/spark/util/IvyTestUtils.scala
+++ b/common/utils/src/test/scala/org/apache/spark/util/IvyTestUtils.scala
@@ -95,7 +95,7 @@ private[spark] object IvyTestUtils {
className: String,
packageName: String): Seq[(String, File)] = {
val rFilesDir = new File(dir, "R" + File.separator + "pkg")
- new File(rFilesDir, "R").mkdirs()
+ SparkFileUtils.createDirectory(new File(rFilesDir, "R"))
val contents =
s"""myfunc <- function(x) {
| SparkR:::callJStatic("$packageName.$className", "myFunc", x)
@@ -150,11 +150,11 @@ private[spark] object IvyTestUtils {
useIvyLayout: Boolean): File = {
if (useIvyLayout) {
val ivyXmlPath = pathFromCoordinate(artifact, tempPath, "ivy", true)
- ivyXmlPath.mkdirs()
+ SparkFileUtils.createDirectory(ivyXmlPath)
createIvyDescriptor(ivyXmlPath, artifact, dependencies)
} else {
val pomPath = pathFromCoordinate(artifact, tempPath, "pom", useIvyLayout)
- pomPath.mkdirs()
+ SparkFileUtils.createDirectory(pomPath)
createPom(pomPath, artifact, dependencies)
}
}
@@ -293,13 +293,13 @@ private[spark] object IvyTestUtils {
// Where the root of the repository exists, and what Ivy will search in
val tempPath = tempDir.getOrElse(SparkFileUtils.createTempDir())
// Create directory if it doesn't exist
- tempPath.mkdirs()
+ SparkFileUtils.createDirectory(tempPath)
// Where to create temporary class files and such
val root = new File(tempPath, tempPath.hashCode().toString)
- root.mkdirs()
+ SparkFileUtils.createDirectory(root)
try {
val jarPath = pathFromCoordinate(artifact, tempPath, "jar", useIvyLayout)
- jarPath.mkdirs()
+ SparkFileUtils.createDirectory(jarPath)
val className = "MyLib"
val javaClass = createJavaClass(root, className, artifact.groupId)
diff --git
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 6f345e069ff7..ccced0e1f0c1 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -1899,7 +1899,7 @@ abstract class AvroSuite
withTempPath { tempDir =>
val tempEmptyDir = s"$tempDir/sqlOverwrite"
// Create a temp directory for table that will be overwritten
- new File(tempEmptyDir).mkdirs()
+ Utils.createDirectory(tempEmptyDir)
spark.sql(
s"""
|CREATE TEMPORARY VIEW episodes
diff --git
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
index 212693f6e02c..a1264af021aa 100644
---
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
+++
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
@@ -38,6 +38,7 @@ import org.scalatest.concurrent.Eventually.{eventually,
interval, timeout}
import org.apache.spark._
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.streaming.kafka010.mocks.MockTime
+import org.apache.spark.util.Utils
class KafkaRDDSuite extends SparkFunSuite {
@@ -91,7 +92,7 @@ class KafkaRDDSuite extends SparkFunSuite {
val logs = new Pool[TopicPartition, UnifiedLog]()
val logDir = kafkaTestUtils.brokerLogDir
val dir = new File(logDir, topic + "-" + partition)
- dir.mkdirs()
+ Utils.createDirectory(dir)
val logProps = new ju.Properties()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT)
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG,
java.lang.Float.valueOf(0.1f))
diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
index 5d996381a485..ac2c1f73bd09 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
@@ -153,7 +153,7 @@ private[deploy] object RPackageUtils extends Logging {
if (verbose) {
print(log"Creating directory: ${MDC(PATH, dir)}", printStream)
}
- dir.mkdirs
+ Utils.createDirectory(dir)
} else {
val inStream = jar.getInputStream(entry)
val outPath = new File(tempDir, entryPath)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index ca0e024ad1ae..22e4c83440f6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -145,7 +145,7 @@ private[deploy] class DriverRunner(
*/
private def createWorkingDirectory(): File = {
val driverDir = new File(workDir, driverId)
- if (!driverDir.exists() && !driverDir.mkdirs()) {
+ if (!driverDir.exists() && !Utils.createDirectory(driverDir)) {
throw new IOException("Failed to create directory " + driverDir)
}
driverDir
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index b2ec23887a40..488bc6c79131 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -600,7 +600,7 @@ private[deploy] class Worker(
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
- if (!executorDir.mkdirs()) {
+ if (!Utils.createDirectory(executorDir)) {
throw new IOException("Failed to create directory " + executorDir)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 126c92e4cb65..b2b6d2a2959d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -90,7 +90,7 @@ private[spark] class PipedRDD[T: ClassTag](
val currentDir = new File(".")
logDebug("currentDir = " + currentDir.getAbsolutePath())
val taskDirFile = new File(taskDirectory)
- taskDirFile.mkdirs()
+ Utils.createDirectory(taskDirFile)
try {
val tasksDirFilter = new NotEqualsFileNameFilter("tasks")
diff --git
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 72d8dc0b19d2..94a0ea1ecaef 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -316,7 +316,7 @@ private[spark] class DiskBlockManager(
throw
SparkCoreErrors.failToCreateDirectoryError(dirToCreate.getAbsolutePath,
maxAttempts)
}
try {
- dirToCreate.mkdirs()
+ Utils.createDirectory(dirToCreate)
Files.setPosixFilePermissions(
dirToCreate.toPath, PosixFilePermissions.fromString("rwxrwx---"))
if (dirToCreate.exists()) {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2d3ae9f171fe..8f60857eb691 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -458,7 +458,7 @@ private[spark] object Utils
* to work around a security issue, see also SPARK-38631.
*/
private def unTarUsingJava(source: File, dest: File): Unit = {
- if (!dest.mkdirs && !dest.isDirectory) {
+ if (!Utils.createDirectory(dest) && !dest.isDirectory) {
throw new IOException(s"Mkdirs failed to create $dest")
} else {
try {
@@ -810,7 +810,7 @@ private[spark] object Utils
configuredLocalDirs.flatMap { root =>
try {
val rootDir = new File(root)
- if (rootDir.exists || rootDir.mkdirs()) {
+ if (rootDir.exists || Utils.createDirectory(rootDir)) {
val dir = createTempDir(root)
chmod700(dir)
Some(dir.getAbsolutePath)
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 9e5859feefb5..a88240bc612a 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -455,7 +455,7 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
test("SPARK-22585 addJar argument without scheme is interpreted literally
without url decoding") {
withTempDir { dir =>
val tmpDir = new File(dir, "host%3A443")
- tmpDir.mkdirs()
+ Utils.createDirectory(tmpDir)
val tmpJar = File.createTempFile("t%2F", ".jar", tmpDir)
sc = new SparkContext("local", "test")
diff --git a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala
b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala
index 5eb22032a5e8..ebb8609e8c13 100644
--- a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala
+++ b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala
@@ -20,6 +20,7 @@ package org.apache.spark.benchmark
import java.io.{File, FileOutputStream, OutputStream}
import org.apache.spark.internal.config.Tests.IS_TESTING
+import org.apache.spark.util.Utils
/**
* A base class for generate benchmark results to a file.
@@ -60,7 +61,7 @@ abstract class BenchmarkBase {
// scalastyle:off println
println(s"Creating ${dir.getAbsolutePath} for benchmark results.")
// scalastyle:on println
- dir.mkdirs()
+ Utils.createDirectory(dir)
}
val file = new File(dir, resultFileName)
if (!file.exists()) {
diff --git
a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
index 77f5268f79ca..67c1abdb0d55 100644
--- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
@@ -147,12 +147,12 @@ class RPackageUtilsSuite
Utils.tryWithSafeFinally {
IvyTestUtils.writeFile(tempDir, "test.R", "abc")
val fakeSparkRDir = new File(tempDir, "SparkR")
- assert(fakeSparkRDir.mkdirs())
+ assert(Utils.createDirectory(fakeSparkRDir))
IvyTestUtils.writeFile(fakeSparkRDir, "abc.R", "abc")
IvyTestUtils.writeFile(fakeSparkRDir, "DESCRIPTION", "abc")
IvyTestUtils.writeFile(tempDir, "package.zip", "abc") // fake zip file :)
val fakePackageDir = new File(tempDir, "packageTest")
- assert(fakePackageDir.mkdirs())
+ assert(Utils.createDirectory(fakePackageDir))
IvyTestUtils.writeFile(fakePackageDir, "def.R", "abc")
IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index bd34e6f2bba3..1bf7d875c653 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -904,7 +904,7 @@ class SparkSubmitSuite
// compile a small jar containing a class that will be called from R code.
withTempDir { tempDir =>
val srcDir = new File(tempDir, "sparkrtest")
- srcDir.mkdirs()
+ Utils.createDirectory(srcDir)
val excSource = new JavaSourceFromString(new File(srcDir,
"DummyClass").toURI.getPath,
"""package sparkrtest;
|
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 4ac919dd9e6a..7cfcfe0e4cf3 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -220,7 +220,7 @@ abstract class HistoryServerSuite extends SparkFunSuite
with BeforeAndAfter with
if (regenerateGoldenFiles) {
FileUtils.deleteDirectory(expRoot)
- expRoot.mkdirs()
+ Utils.createDirectory(expRoot)
}
// run a bunch of characterization tests -- just verify the behavior is the
same as what is saved
diff --git
a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index f3bae2066e14..ff5d314d1688 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -383,7 +383,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with
BeforeAndAfter with P
// Create the executor's working directory
val executorDir = new File(worker.workDir, appId + "/" + execId)
- if (!executorDir.exists && !executorDir.mkdirs()) {
+ if (!executorDir.exists && !Utils.createDirectory(executorDir)) {
throw new IOException("Failed to create directory " + executorDir)
}
executorDir.setLastModified(System.currentTimeMillis - (1000 * 120))
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 5a8722a55ed7..4c6b7f25e3c3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -207,7 +207,7 @@ class TaskResultGetterSuite extends SparkFunSuite with
BeforeAndAfter with Local
// compile a small jar containing an exception that will be thrown on an
executor.
val tempDir = Utils.createTempDir()
val srcDir = new File(tempDir, "repro/")
- srcDir.mkdirs()
+ Utils.createDirectory(srcDir)
val excSource = new JavaSourceFromString(new File(srcDir,
"MyException").toURI.getPath,
"""package repro;
|
diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
index 4b22ec334e84..3a30444d0aab 100644
--- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
@@ -29,7 +29,7 @@ class LocalDirsSuite extends SparkFunSuite with
LocalRootDirsTest {
private def assumeNonExistentAndNotCreatable(f: File): Unit = {
try {
- assume(!f.exists() && !f.mkdirs())
+ assume(!f.exists() && !Utils.createDirectory(f))
} finally {
Utils.deleteRecursively(f)
}
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 56d7b7ff6a09..ca22329f60bb 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -129,12 +129,12 @@ abstract class YarnShuffleServiceSuite extends
SparkFunSuite with Matchers {
reduceId: Int,
blockId: String): AppShufflePartitionInfo = {
val dataFile = ShuffleTestAccessor.getMergedShuffleDataFile(mergeManager,
partitionId, reduceId)
- dataFile.getParentFile.mkdirs()
+ Utils.createDirectory(dataFile.getParentFile)
val indexFile =
ShuffleTestAccessor.getMergedShuffleIndexFile(mergeManager, partitionId,
reduceId)
- indexFile.getParentFile.mkdirs()
+ Utils.createDirectory(indexFile.getParentFile)
val metaFile = ShuffleTestAccessor.getMergedShuffleMetaFile(mergeManager,
partitionId, reduceId)
- metaFile.getParentFile.mkdirs()
+ Utils.createDirectory(metaFile.getParentFile)
val partitionInfo = ShuffleTestAccessor.getOrCreateAppShufflePartitionInfo(
mergeManager, partitionId, reduceId, blockId)
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/TransformWithStateConnectSuite.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/TransformWithStateConnectSuite.scala
index 310b50dac1cc..f113f274d712 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/TransformWithStateConnectSuite.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/TransformWithStateConnectSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.connect.test.{QueryTest,
RemoteSparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{ListState, MapState, OutputMode,
StatefulProcessor, StatefulProcessorWithInitialState, TimeMode, TimerValues,
TTLConfig, ValueState}
import org.apache.spark.sql.types._
+import org.apache.spark.util.SparkFileUtils
case class InputRowForConnectTest(key: String, value: String)
case class OutputRowForConnectTest(key: String, value: String)
@@ -494,7 +495,7 @@ class TransformWithStateConnectSuite
val file = Paths.get(inputPath).toFile
val parentDir = file.getParentFile
if (parentDir != null && !parentDir.exists()) {
- parentDir.mkdirs()
+ SparkFileUtils.createDirectory(parentDir)
}
val writer = new BufferedWriter(new FileWriter(inputPath))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 562a57aafbd4..636103f58330 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -335,7 +335,7 @@ class RocksDBFileManager(
versionToRocksDBFiles.keySet().removeIf(_._1 >= version)
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
- localDir.mkdirs()
+ Utils.createDirectory(localDir)
// Since we cleared the local dir, we should also clear the local file
mapping
rocksDBFileMapping.clear()
RocksDBCheckpointMetadata(Seq.empty, 0)
@@ -828,7 +828,7 @@ class RocksDBFileManager(
private def getImmutableFilesFromVersionZip(
version: Long, checkpointUniqueId: Option[String] = None):
Seq[RocksDBImmutableFile] = {
Utils.deleteRecursively(localTempDir)
- localTempDir.mkdirs()
+ Utils.createDirectory(localTempDir)
Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version, checkpointUniqueId),
localTempDir)
val metadataFile = localMetadataFile(localTempDir)
val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
index df6fc50dc59d..f689c88a1cac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.tags.ExtendedSQLTest
+import org.apache.spark.util.Utils
// scalastyle:off line.size.limit
/**
@@ -147,7 +148,7 @@ class ExpressionsSchemaSuite extends QueryTest with
SharedSparkSession {
val goldenOutput = (header ++ outputBuffer).mkString("\n")
val parent = resultFile.getParentFile
if (!parent.exists()) {
- assert(parent.mkdirs(), "Could not create directory: " + parent)
+ assert(Utils.createDirectory(parent), "Could not create directory: " +
parent)
}
stringToFile(resultFile, goldenOutput)
// scalastyle:off println
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/ICUCollationsMapSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ICUCollationsMapSuite.scala
index 42d486bd7545..c9f0b9e54285 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ICUCollationsMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ICUCollationsMapSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile,
CollationFactory}
+import org.apache.spark.util.Utils
// scalastyle:off line.size.limit
/**
@@ -54,7 +55,7 @@ class ICUCollationsMapSuite extends SparkFunSuite {
}
val parent = collationsMapFile.getParentFile
if (!parent.exists()) {
- assert(parent.mkdirs(), "Could not create directory: " + parent)
+ assert(Utils.createDirectory(parent), "Could not create directory: " +
parent)
}
stringToFile(collationsMapFile, goldenOutput)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
index ad424b3a7cc7..bf6aca29be75 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
@@ -31,6 +31,7 @@ import
org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec,
ValidateRequirements}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.tags.ExtendedSQLTest
+import org.apache.spark.util.Utils
// scalastyle:off line.size.limit
/**
@@ -123,7 +124,7 @@ trait PlanStabilitySuite extends
DisableAdaptiveExecutionSuite {
if (!foundMatch) {
FileUtils.deleteDirectory(dir)
- assert(dir.mkdirs())
+ assert(Utils.createDirectory(dir))
val file = new File(dir, "simplified.txt")
FileUtils.writeStringToFile(file, simplified, StandardCharsets.UTF_8)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 575a4ae69d1a..e4fe46d08fa7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -377,7 +377,7 @@ class SQLQueryTestSuite extends QueryTest with
SharedSparkSession with SQLHelper
val resultFile = new File(testCase.resultFile)
val parent = resultFile.getParentFile
if (!parent.exists()) {
- assert(parent.mkdirs(), "Could not create directory: " + parent)
+ assert(Utils.createDirectory(parent), "Could not create directory: " +
parent)
}
stringToFile(resultFile, goldenOutput)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala
index c1246a167b8c..d83fed4bf9a7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.util.{fileToString,
resourceToString, strin
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.TestSparkSession
import org.apache.spark.tags.ExtendedSQLTest
+import org.apache.spark.util.Utils
/**
* End-to-end tests to check TPCDS query results.
@@ -122,7 +123,7 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase
with SQLQueryTestHelp
}
val parent = goldenFile.getParentFile
if (!parent.exists()) {
- assert(parent.mkdirs(), "Could not create directory: " + parent)
+ assert(Utils.createDirectory(parent), "Could not create directory:
" + parent)
}
stringToFile(goldenFile, goldenOutput)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 94a0501b74d4..4faeae51ca58 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -400,7 +400,7 @@ class FileSourceStrategySuite extends QueryTest with
SharedSparkSession {
Seq("p1=1/p2=2/p3=3/file1", "p1=1/p2=3/p3=3/file1").foreach { fileName
=>
val file = new File(tempDir, fileName)
- assert(file.getParentFile.exists() || file.getParentFile.mkdirs())
+ assert(file.getParentFile.exists() ||
Utils.createDirectory(file.getParentFile))
util.stringToFile(file, fileName)
}
@@ -682,7 +682,7 @@ class FileSourceStrategySuite extends QueryTest with
SharedSparkSession {
files.foreach {
case (name, size) =>
val file = new File(tempDir, name)
- assert(file.getParentFile.exists() || file.getParentFile.mkdirs())
+ assert(file.getParentFile.exists() ||
Utils.createDirectory(file.getParentFile))
util.stringToFile(file, "*" * size)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index d63a1a7a2248..0bc0dfeff05c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -4065,7 +4065,7 @@ abstract class JsonSuite
// Test scan with partitions.
withTempDir { dir =>
- new File(dir, "a=1/b=2/").mkdirs()
+ Utils.createDirectory(new File(dir, "a=1/b=2/"))
Files.write(new File(dir, "a=1/b=2/file.json").toPath, content)
checkAnswer(
spark.read.format("json").option("singleVariantColumn", "var")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
index ea839b8e1ef1..4de62156e3b9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
@@ -25,6 +25,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
// The data where the partitioning key exists only in the directory structure.
case class OrcParData(intField: Int, stringField: String)
@@ -56,7 +57,7 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
new File(parent, child)
}
- assert(partDir.mkdirs(), s"Couldn't create directory $partDir")
+ assert(Utils.createDirectory(partDir), s"Couldn't create directory
$partDir")
partDir
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index d108803d43e4..b7b082e32965 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
/**
* A helper trait that provides convenient facilities for Parquet testing.
@@ -105,7 +106,7 @@ private[sql] trait ParquetTest extends
FileBasedDataSourceTest {
new File(parent, child)
}
- assert(partDir.mkdirs(), s"Couldn't create directory $partDir")
+ assert(Utils.createDirectory(partDir), s"Couldn't create directory
$partDir")
partDir
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
index 4314e0d0ee38..5cf1dea7d073 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
@@ -68,7 +68,7 @@ class RowQueueSuite extends SparkFunSuite with
EncryptionFunSuite {
encryptionTest("disk queue") { conf =>
val serManager = createSerializerManager(conf)
val dir = Utils.createTempDir().getCanonicalFile
- dir.mkdirs()
+ Utils.createDirectory(dir)
val queue = DiskRowQueue(new File(dir, "buffer"), 1, serManager)
val row = new UnsafeRow(1)
row.pointTo(new Array[Byte](16), 16)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index b18d8f816e30..55bee7d4713d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -264,9 +264,9 @@ private class PartitionFileExistCommitProtocol(
override def setupJob(jobContext: JobContext): Unit = {
super.setupJob(jobContext)
val stagingDir = new File(new Path(path).toUri.getPath,
s".spark-staging-$jobId")
- stagingDir.mkdirs()
+ Utils.createDirectory(stagingDir)
val stagingPartDir = new File(stagingDir, "p1=2")
- stagingPartDir.mkdirs()
+ Utils.createDirectory(stagingPartDir)
val conflictTaskFile = new File(stagingPartDir,
s"part-00000-$jobId.c000.snappy.parquet")
conflictTaskFile.createNewFile()
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index a753da116924..a6a044c302ce 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -106,7 +106,7 @@ abstract class FileStreamSourceTest
override def addData(source: FileStreamSource): Unit = {
val tempFile = Utils.tempFileWith(new File(tmp, tmpFilePrefix))
val finalFile = new File(src, tempFile.getName)
- src.mkdirs()
+ Utils.createDirectory(src)
require(stringToFile(tempFile, content).renameTo(finalFile))
logInfo(s"Written text '$content' to file $finalFile")
}
@@ -127,7 +127,7 @@ abstract class FileStreamSourceTest
def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
val tmpDir = Utils.tempFileWith(new File(tmp, "orc"))
df.write.orc(tmpDir.getCanonicalPath)
- src.mkdirs()
+ Utils.createDirectory(src)
tmpDir.listFiles().foreach { f =>
f.renameTo(new File(src, s"${f.getName}"))
}
@@ -149,7 +149,7 @@ abstract class FileStreamSourceTest
def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
val tmpDir = Utils.tempFileWith(new File(tmp, "parquet"))
df.write.parquet(tmpDir.getCanonicalPath)
- src.mkdirs()
+ Utils.createDirectory(src)
tmpDir.listFiles().foreach { f =>
f.renameTo(new File(src, s"${f.getName}"))
}
@@ -664,7 +664,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
withTempDirs { case (baseSrc, tmp) =>
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
val src = new File(baseSrc, "type=X")
- src.mkdirs()
+ Utils.createDirectory(src)
// Add a file so that we can infer its schema
stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c':
'keep2'}\n{'c': 'keep3'}")
@@ -1451,7 +1451,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
test("explain") {
withTempDirs { case (src, tmp) =>
- src.mkdirs()
+ Utils.createDirectory(src)
val df =
spark.readStream.format("text").load(src.getCanonicalPath).map(_.toString +
"-x")
// Test `explain` not throwing errors
@@ -1500,7 +1500,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
withTempDirs { case (root, tmp) =>
val src = new File(root, "a=1")
- src.mkdirs()
+ Utils.createDirectory(src)
(1 to numFiles).map { _.toString }.foreach { i =>
val tempFile = Utils.tempFileWith(new File(tmp, "text"))
@@ -1924,8 +1924,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
withTempDirs { case (dir, tmp) =>
val sourceDir1 = new File(dir, "source1")
val sourceDir2 = new File(dir, "source2")
- sourceDir1.mkdirs()
- sourceDir2.mkdirs()
+ Utils.createDirectory(sourceDir1)
+ Utils.createDirectory(sourceDir2)
val source1 = createFileStream("text", s"${sourceDir1.getCanonicalPath}")
val source2 = createFileStream("text", s"${sourceDir2.getCanonicalPath}")
@@ -2595,7 +2595,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val tempFile = Utils.tempFileWith(new File(tmp, "text"))
val finalFile = new File(src, tempFile.getName)
require(!src.exists(), s"$src exists, dir: ${src.isDirectory}, file:
${src.isFile}")
- require(src.mkdirs(), s"Cannot create $src")
+ require(Utils.createDirectory(src), s"Cannot create $src")
require(src.isDirectory(), s"$src is not a directory")
require(stringToFile(tempFile, content).renameTo(finalFile))
finalFile
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 200603cae586..6ea610857b07 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -567,7 +567,7 @@ class DataStreamReaderWriterSuite extends StreamTest with
BeforeAndAfter {
test("MemorySink can recover from a checkpoint in Complete Mode") {
val checkpointLoc = newMetadataDir
val checkpointDir = new File(checkpointLoc, "offsets")
- checkpointDir.mkdirs()
+ Utils.createDirectory(checkpointDir)
assert(checkpointDir.exists())
testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = true)
}
@@ -575,7 +575,7 @@ class DataStreamReaderWriterSuite extends StreamTest with
BeforeAndAfter {
test("SPARK-18927: MemorySink can recover from a checkpoint provided in conf
in Complete Mode") {
val checkpointLoc = newMetadataDir
val checkpointDir = new File(checkpointLoc, "offsets")
- checkpointDir.mkdirs()
+ Utils.createDirectory(checkpointDir)
assert(checkpointDir.exists())
withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointLoc) {
testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = false)
@@ -588,7 +588,7 @@ class DataStreamReaderWriterSuite extends StreamTest with
BeforeAndAfter {
val df = ms.toDF().toDF("a")
val checkpointLoc = newMetadataDir
val checkpointDir = new File(checkpointLoc, "offsets")
- checkpointDir.mkdirs()
+ Utils.createDirectory(checkpointDir)
assert(checkpointDir.exists())
val e = intercept[AnalysisException] {
diff --git
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 9d9b6f1c7b0e..680c16cac74d 100644
---
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -75,6 +75,7 @@ import org.apache.spark.internal.SparkLogger;
import org.apache.spark.internal.SparkLoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;
+import org.apache.spark.util.Utils;
import static org.apache.hadoop.hive.conf.SystemVariables.ENV_PREFIX;
import static org.apache.hadoop.hive.conf.SystemVariables.HIVECONF_PREFIX;
@@ -304,7 +305,7 @@ public class HiveSessionImpl implements HiveSession {
if (!operationLogRootDir.exists()) {
LOG.warn("The operation log root directory is removed, recreating: {}",
MDC.of(LogKeys.PATH$.MODULE$, operationLogRootDir.getAbsolutePath()));
- if (!operationLogRootDir.mkdirs()) {
+ if (!Utils.createDirectory(operationLogRootDir)) {
LOG.warn("Unable to create operation log root directory: {}",
MDC.of(LogKeys.PATH$.MODULE$,
operationLogRootDir.getAbsolutePath()));
}
diff --git
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
index 3f60fd00b82a..83d4c7a3622f 100644
---
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
+++
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -43,6 +43,7 @@ import org.apache.spark.internal.SparkLogger;
import org.apache.spark.internal.SparkLoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;
+import org.apache.spark.util.Utils;
/**
* SessionManager.
@@ -126,7 +127,7 @@ public class SessionManager extends CompositeService {
}
if (!operationLogRootDir.exists()) {
- if (!operationLogRootDir.mkdirs()) {
+ if (!Utils.createDirectory(operationLogRootDir)) {
LOG.warn("Unable to create operation log root directory: {}",
MDC.of(LogKeys.PATH$.MODULE$,
operationLogRootDir.getAbsolutePath()));
isOperationLogEnabled = false;
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
index 62d97772bcbc..138f979d7bc3 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.scheduler.SparkListenerJobStart
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.InMemoryStore
class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter
{
@@ -39,7 +40,7 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite
with BeforeAndAfter {
val tmpDirName = System.getProperty("java.io.tmpdir")
val tmpDir = new File(tmpDirName)
if (!tmpDir.exists()) {
- tmpDir.mkdirs()
+ Utils.createDirectory(tmpDir)
}
super.beforeAll()
}
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPageSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPageSuite.scala
index 7cf17a089ea6..806eabc96fe3 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPageSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPageSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.scheduler.SparkListenerJobStart
import org.apache.spark.sql.hive.thriftserver._
import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.InMemoryStore
@@ -39,7 +40,7 @@ class ThriftServerPageSuite extends SparkFunSuite with
BeforeAndAfter {
val tmpDirName = System.getProperty("java.io.tmpdir")
val tmpDir = new File(tmpDirName)
if (!tmpDir.exists()) {
- tmpDir.mkdirs()
+ Utils.createDirectory(tmpDir)
}
super.beforeAll()
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index 58ca4a4ad1cf..6581d39c707e 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -142,7 +142,7 @@ class HiveExternalCatalogVersionsSuite extends
SparkSubmitTestUtils {
val outDir = new File(targetDir)
if (!outDir.exists()) {
- outDir.mkdirs()
+ Utils.createDirectory(outDir)
}
// propagate exceptions up to the caller of getFileFromUrl
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index e2f0040afe57..2af4d01fcfb8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -1068,14 +1068,14 @@ class StatisticsSuite extends
StatisticsCollectionTestBase with TestHiveSingleto
withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
val partDir1 = new File(new File(dir1, "ds=2008-04-09"), "hr=11")
val file1 = new File(partDir1, "data")
- file1.getParentFile.mkdirs()
+ Utils.createDirectory(file1.getParentFile)
Utils.tryWithResource(new PrintWriter(file1)) { writer =>
writer.write("1,a")
}
val partDir2 = new File(new File(dir2, "ds=2008-04-09"), "hr=12")
val file2 = new File(partDir2, "data")
- file2.getParentFile.mkdirs()
+ Utils.createDirectory(file2.getParentFile)
Utils.tryWithResource(new PrintWriter(file2)) { writer =>
writer.write("1,a")
}
@@ -1670,14 +1670,14 @@ class StatisticsSuite extends
StatisticsCollectionTestBase with TestHiveSingleto
withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
val partDir1 = new File(new File(dir1, "ds=2008-04-09"), "hr=11")
val file1 = new File(partDir1, "data")
- file1.getParentFile.mkdirs()
+ Utils.createDirectory(file1.getParentFile)
Utils.tryWithResource(new PrintWriter(file1)) { writer =>
writer.write("1,a")
}
val partDir2 = new File(new File(dir2, "ds=2008-04-09"), "hr=12")
val file2 = new File(partDir2, "data")
- file2.getParentFile.mkdirs()
+ Utils.createDirectory(file2.getParentFile)
Utils.tryWithResource(new PrintWriter(file2)) { writer =>
writer.write("1,a")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]