This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bede6141a639 [SPARK-46623][CORE][MLLIB][SQL] Replace SimpleDateFormat
with DateTimeFormatter
bede6141a639 is described below
commit bede6141a639e26b48465089671ffee73a43933c
Author: beliefer <[email protected]>
AuthorDate: Thu Jan 18 02:20:45 2024 -0600
[SPARK-46623][CORE][MLLIB][SQL] Replace SimpleDateFormat with
DateTimeFormatter
### What changes were proposed in this pull request?
This PR propose to replace `SimpleDateFormat` with `DateTimeFormatter`.
### Why are the changes needed?
According to the javadoc of `SimpleDateFormat`, it recommended to use
`DateTimeFormatter` too.

In addition, `DateTimeFormatter` have better performance than
`SimpleDateFormat` too.
Note: `SimpleDateFormat` and `DateTimeFormatter` are not completely
compatible, for example, the formats supported by `parse` are not exactly the
same.
### Does this PR introduce _any_ user-facing change?
'No'.
### How was this patch tested?
GA and manual test.
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #44616 from beliefer/replace-sdf-with-dtf.
Authored-by: beliefer <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../org/apache/spark/kafka010/KafkaTokenUtil.scala | 14 +++++++++-----
.../scala/org/apache/spark/deploy/master/Master.scala | 19 ++++++++++++-------
.../scala/org/apache/spark/deploy/worker/Worker.scala | 15 ++++++++++-----
.../spark/internal/io/SparkHadoopWriterUtils.scala | 11 +++++++++--
.../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 11 ++++++++---
.../scala/org/apache/spark/rdd/NewHadoopRDD.scala | 12 ++++++++----
.../spark/mllib/pmml/export/PMMLModelExport.scala | 12 +++++++++---
.../sql/execution/streaming/MetricsReporter.scala | 12 ++++++++----
.../sql/execution/streaming/ProgressReporter.scala | 13 ++++++++-----
.../continuous/ContinuousTextSocketSource.scala | 5 ++---
.../streaming/sources/TextSocketSourceProvider.scala | 8 ++++++--
.../spark/sql/hive/execution/HiveTempPath.scala | 11 ++++++++---
12 files changed, 97 insertions(+), 46 deletions(-)
diff --git
a/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
b/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index 497ba03b6e16..669491f14205 100644
---
a/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++
b/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -18,7 +18,8 @@
package org.apache.spark.kafka010
import java.{util => ju}
-import java.text.SimpleDateFormat
+import java.time.{Instant, ZoneId}
+import java.time.format.DateTimeFormatter
import java.util.regex.Pattern
import scala.jdk.CollectionConverters._
@@ -46,6 +47,10 @@ import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT
private[spark] object KafkaTokenUtil extends Logging {
val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
private val TOKEN_SERVICE_PREFIX = "kafka.server.delegation.token"
+ private val DATE_TIME_FORMATTER =
+ DateTimeFormatter
+ .ofPattern("yyyy-MM-dd'T'HH:mm")
+ .withZone(ZoneId.systemDefault())
private[kafka010] def getTokenService(identifier: String): Text =
new Text(s"$TOKEN_SERVICE_PREFIX.$identifier")
@@ -220,7 +225,6 @@ private[spark] object KafkaTokenUtil extends Logging {
private def printToken(token: DelegationToken): Unit = {
if (log.isDebugEnabled) {
- val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE",
"MAXDATE"))
val tokenInfo = token.tokenInfo
@@ -229,9 +233,9 @@ private[spark] object KafkaTokenUtil extends Logging {
REDACTION_REPLACEMENT_TEXT,
tokenInfo.owner,
tokenInfo.renewersAsString,
- dateFormat.format(tokenInfo.issueTimestamp),
- dateFormat.format(tokenInfo.expiryTimestamp),
- dateFormat.format(tokenInfo.maxTimestamp)))
+
DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(tokenInfo.issueTimestamp)),
+
DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(tokenInfo.expiryTimestamp)),
+
DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(tokenInfo.maxTimestamp))))
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a1dad0428a6e..19dc61860fac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -17,7 +17,8 @@
package org.apache.spark.deploy.master
-import java.text.SimpleDateFormat
+import java.time.ZoneId
+import java.time.format.DateTimeFormatter
import java.util.{Date, Locale}
import java.util.concurrent.{ScheduledFuture, TimeUnit}
@@ -56,10 +57,6 @@ private[deploy] class Master(
private val driverIdPattern = conf.get(DRIVER_ID_PATTERN)
private val appIdPattern = conf.get(APP_ID_PATTERN)
-
- // For application IDs
- private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss",
Locale.US)
-
private val workerTimeoutMs = conf.get(WORKER_TIMEOUT) * 1000
private val retainedApplications = conf.get(RETAINED_APPLICATIONS)
private val retainedDrivers = conf.get(RETAINED_DRIVERS)
@@ -1236,7 +1233,8 @@ private[deploy] class Master(
/** Generate a new app ID given an app's submission date */
private def newApplicationId(submitDate: Date): String = {
- val appId = appIdPattern.format(createDateFormat.format(submitDate),
nextAppNumber)
+ val appId = appIdPattern.format(
+ Master.DATE_TIME_FORMATTER.format(submitDate.toInstant), nextAppNumber)
nextAppNumber += 1
if (moduloAppNumber > 0) {
nextAppNumber %= moduloAppNumber
@@ -1264,7 +1262,8 @@ private[deploy] class Master(
}
private def newDriverId(submitDate: Date): String = {
- val appId = driverIdPattern.format(createDateFormat.format(submitDate),
nextDriverNumber)
+ val appId = driverIdPattern.format(
+ Master.DATE_TIME_FORMATTER.format(submitDate.toInstant),
nextDriverNumber)
nextDriverNumber += 1
appId
}
@@ -1311,6 +1310,12 @@ private[deploy] object Master extends Logging {
val SYSTEM_NAME = "sparkMaster"
val ENDPOINT_NAME = "Master"
+ // For application IDs
+ private val DATE_TIME_FORMATTER =
+ DateTimeFormatter
+ .ofPattern("yyyyMMddHHmmss", Locale.US)
+ .withZone(ZoneId.systemDefault())
+
def main(argStrings: Array[String]): Unit = {
Thread.setDefaultUncaughtExceptionHandler(new
SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 785129e1d818..96ceb1e5e121 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -18,8 +18,9 @@
package org.apache.spark.deploy.worker
import java.io.{File, IOException}
-import java.text.SimpleDateFormat
-import java.util.{Date, Locale, UUID}
+import java.time.{Instant, ZoneId}
+import java.time.format.DateTimeFormatter
+import java.util.{Locale, UUID}
import java.util.concurrent._
import java.util.concurrent.{Future => JFuture, ScheduledFuture =>
JScheduledFuture}
import java.util.function.Supplier
@@ -90,8 +91,6 @@ private[deploy] class Worker(
private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
- // For worker and executor IDs
- private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss",
Locale.US)
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
private val HEARTBEAT_MILLIS = conf.get(WORKER_TIMEOUT) * 1000 / 4
@@ -821,7 +820,7 @@ private[deploy] class Worker(
}
private def generateWorkerId(): String = {
- workerIdPattern.format(createDateFormat.format(new Date), host, port)
+ workerIdPattern.format(Worker.DATE_TIME_FORMATTER.format(Instant.now()),
host, port)
}
override def onStop(): Unit = {
@@ -937,6 +936,12 @@ private[deploy] object Worker extends Logging {
val ENDPOINT_NAME = "Worker"
private val SSL_NODE_LOCAL_CONFIG_PATTERN =
"""\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r
+ // For worker and executor IDs
+ private val DATE_TIME_FORMATTER =
+ DateTimeFormatter
+ .ofPattern("yyyyMMddHHmmss", Locale.US)
+ .withZone(ZoneId.systemDefault())
+
def main(argStrings: Array[String]): Unit = {
Thread.setDefaultUncaughtExceptionHandler(new
SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
diff --git
a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
index 6ba6713b6999..2673f043a9d7 100644
---
a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
+++
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
@@ -17,7 +17,8 @@
package org.apache.spark.internal.io
-import java.text.SimpleDateFormat
+import java.time.ZoneId
+import java.time.format.DateTimeFormatter
import java.util.{Date, Locale}
import scala.util.{DynamicVariable, Random}
@@ -39,6 +40,12 @@ object SparkHadoopWriterUtils {
private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
private val RAND = new Random()
+ // For job tracker IDs
+ private val DATE_TIME_FORMATTER =
+ DateTimeFormatter
+ .ofPattern("yyyyMMddHHmmss", Locale.US)
+ .withZone(ZoneId.systemDefault())
+
/**
* Create a job ID.
*
@@ -71,7 +78,7 @@ object SparkHadoopWriterUtils {
* @return a string for a job ID
*/
def createJobTrackerID(time: Date): String = {
- val base = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time)
+ val base = DATE_TIME_FORMATTER.format(time.toInstant)
var l1 = RAND.nextLong()
if (l1 < 0) {
l1 = -l1
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index bea1f3093fd7..908ce1b233c5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -18,10 +18,10 @@
package org.apache.spark.rdd
import java.io.{FileNotFoundException, IOException}
-import java.text.SimpleDateFormat
+import java.time.ZoneId
+import java.time.format.DateTimeFormatter
import java.util.{Date, Locale}
-import scala.collection.immutable.Map
import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
@@ -302,7 +302,7 @@ class HadoopRDD[K, V](
private var reader: RecordReader[K, V] = null
private val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(
- new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
+ HadoopRDD.DATE_TIME_FORMATTER.format(createTime.toInstant),
context.stageId(), theSplit.index, context.attemptNumber(), jobConf)
reader =
@@ -426,6 +426,11 @@ private[spark] object HadoopRDD extends Logging {
*/
val CONFIGURATION_INSTANTIATION_LOCK = new Object()
+ private val DATE_TIME_FORMATTER =
+ DateTimeFormatter
+ .ofPattern("yyyyMMddHHmmss", Locale.US)
+ .withZone(ZoneId.systemDefault())
+
/**
* The three methods below are helpers for accessing the local map, a
property of the SparkEnv of
* the local process.
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index a12d4a40b146..7db8531e4a59 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -18,8 +18,9 @@
package org.apache.spark.rdd
import java.io.{FileNotFoundException, IOException}
-import java.text.SimpleDateFormat
-import java.util.{Date, Locale}
+import java.time.{Instant, ZoneId}
+import java.time.format.DateTimeFormatter
+import java.util.Locale
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
@@ -104,8 +105,11 @@ class NewHadoopRDD[K, V](
// private val serializableConf = new SerializableWritable(_conf)
private val jobTrackerId: String = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
- formatter.format(new Date())
+ val dateTimeFormatter =
+ DateTimeFormatter
+ .ofPattern("yyyyMMddHHmmss", Locale.US)
+ .withZone(ZoneId.systemDefault())
+ dateTimeFormatter.format(Instant.now())
}
@transient protected val jobId = new JobID(jobTrackerId, id)
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala
b/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala
index 285ca19e6ce9..838b0cefe625 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala
@@ -17,8 +17,9 @@
package org.apache.spark.mllib.pmml.`export`
-import java.text.SimpleDateFormat
-import java.util.{Date, Locale}
+import java.time.{Instant, ZoneId}
+import java.time.format.DateTimeFormatter
+import java.util.Locale
import scala.beans.BeanProperty
@@ -26,6 +27,11 @@ import org.dmg.pmml.{Application, Header, PMML, Timestamp}
private[mllib] trait PMMLModelExport {
+ private val DATE_TIME_FORMATTER =
+ DateTimeFormatter
+ .ofPattern("yyyy-MM-dd'T'HH:mm:ss", Locale.US)
+ .withZone(ZoneId.systemDefault())
+
/**
* Holder of the exported model in PMML format
*/
@@ -34,7 +40,7 @@ private[mllib] trait PMMLModelExport {
val version = getClass.getPackage.getImplementationVersion
val app = new Application("Apache Spark MLlib").setVersion(version)
val timestamp = new Timestamp()
- .addContent(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss",
Locale.US).format(new Date()))
+ .addContent(DATE_TIME_FORMATTER.format(Instant.now()))
val header = new Header()
.setApplication(app)
.setTimestamp(timestamp)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
index 600b16f86dbc..3919f8a2eb21 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.execution.streaming
-import java.text.SimpleDateFormat
+import java.time.ZonedDateTime
+import java.time.format.DateTimeFormatter
import com.codahale.metrics.{Gauge, MetricRegistry}
@@ -42,8 +43,10 @@ class MetricsReporter(
registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
registerGauge("latency", _.durationMs.getOrDefault("triggerExecution",
0L).longValue(), 0L)
- private val timestampFormat = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
- timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+ private val timestampFormat =
+ DateTimeFormatter
+ .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+ .withZone(DateTimeUtils.getZoneId("UTC"))
registerGauge("eventTime-watermark",
progress =>
convertStringDateToMillis(progress.eventTime.get("watermark")), 0L)
@@ -53,7 +56,8 @@ class MetricsReporter(
private def convertStringDateToMillis(isoUtcDateStr: String) = {
if (isoUtcDateStr != null) {
- timestampFormat.parse(isoUtcDateStr).getTime
+ val zonedDateTime = ZonedDateTime.parse(isoUtcDateStr, timestampFormat)
+ zonedDateTime.toInstant.toEpochMilli
} else {
0L
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 2a9aa82a1485..c01f156e3d70 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.execution.streaming
-import java.text.SimpleDateFormat
-import java.util.{Date, Optional, UUID}
+import java.time.Instant
+import java.time.format.DateTimeFormatter
+import java.util.{Optional, UUID}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@@ -91,8 +92,10 @@ trait ProgressReporter extends Logging {
// The timestamp we report an event that has not executed anything
private var lastNoExecutionProgressEventTime = Long.MinValue
- private val timestampFormat = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
- timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+ private val timestampFormat =
+ DateTimeFormatter
+ .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+ .withZone(DateTimeUtils.getZoneId("UTC"))
@volatile
protected var currentStatus: StreamingQueryStatus = {
@@ -435,7 +438,7 @@ trait ProgressReporter extends Logging {
}
protected def formatTimestamp(millis: Long): String = {
- timestampFormat.format(new Date(millis))
+ timestampFormat.format(Instant.ofEpochMilli(millis))
}
/** Updates the message returned in `status`. */
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
index 5463a1fa4e99..b41b3c329712 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming.continuous
import java.io.{BufferedReader, InputStreamReader, IOException}
import java.net.Socket
import java.sql.Timestamp
-import java.util.Calendar
+import java.time.Instant
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.ListBuffer
@@ -185,8 +185,7 @@ class TextSocketContinuousStream(
TextSocketContinuousStream.this.synchronized {
currentOffset += 1
val newData = (line,
- Timestamp.valueOf(
-
TextSocketReader.DATE_FORMAT.format(Calendar.getInstance().getTime()))
+
Timestamp.valueOf(TextSocketReader.DATE_TIME_FORMATTER.format(Instant.now()))
)
buckets(currentOffset % numPartitions) += toRow(newData)
.copy().asInstanceOf[UnsafeRow]
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index e4251cc7d393..41b64f748225 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.execution.streaming.sources
-import java.text.SimpleDateFormat
+import java.time.ZoneId
+import java.time.format.DateTimeFormatter
import java.util
import java.util.Locale
@@ -108,5 +109,8 @@ object TextSocketReader {
val SCHEMA_REGULAR = StructType(Array(StructField("value", StringType)))
val SCHEMA_TIMESTAMP = StructType(Array(StructField("value", StringType),
StructField("timestamp", TimestampType)))
- val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
+ val DATE_TIME_FORMATTER =
+ DateTimeFormatter
+ .ofPattern("yyyy-MM-dd HH:mm:ss", Locale.US)
+ .withZone(ZoneId.systemDefault())
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
index b3b9ebea21c4..eb8482da38e5 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.hive.execution
import java.io.IOException
import java.net.URI
-import java.text.SimpleDateFormat
+import java.time.ZoneId
+import java.time.format.DateTimeFormatter
import java.util.{Date, Locale, Random}
import scala.util.control.NonFatal
@@ -41,6 +42,11 @@ class HiveTempPath(session: SparkSession, val hadoopConf:
Configuration, path: P
lazy val externalTempPath: Path = getExternalTmpPath(path)
+ private lazy val dateTimeFormatter =
+ DateTimeFormatter
+ .ofPattern("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
+ .withZone(ZoneId.systemDefault())
+
private def getExternalTmpPath(path: Path): Path = {
import org.apache.spark.sql.hive.client.hive._
@@ -117,8 +123,7 @@ class HiveTempPath(session: SparkSession, val hadoopConf:
Configuration, path: P
private def executionId: String = {
val rand: Random = new Random
- val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
- "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
+ "hive_" + dateTimeFormatter.format(new Date().toInstant) + "_" +
Math.abs(rand.nextLong)
}
def deleteTmpPath() : Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]