This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a2a41b7 [SPARK-26978][CORE][SQL] Avoid magic time constants
a2a41b7 is described below
commit a2a41b7bf2bfdcd1cff242013716ac7bd84bdacd
Author: Maxim Gekk <[email protected]>
AuthorDate: Tue Feb 26 09:08:12 2019 -0600
[SPARK-26978][CORE][SQL] Avoid magic time constants
## What changes were proposed in this pull request?
In the PR, I propose to refactor existing code related to date/time
conversions, and replace constants like `1000` and `1000000` by `DateTimeUtils`
constants and transformation functions from `java.util.concurrent.TimeUnit._`.
## How was this patch tested?
The changes are tested by existing test suites.
Closes #23878 from MaxGekk/magic-time-constants.
Lead-authored-by: Maxim Gekk <[email protected]>
Co-authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../org/apache/spark/BarrierTaskContext.scala | 12 +--
.../org/apache/spark/deploy/master/Master.scala | 5 +-
.../org/apache/spark/benchmark/Benchmark.scala | 4 +-
.../spark/sql/catalyst/expressions/Cast.scala | 34 ++++----
.../expressions/codegen/CodeGenerator.scala | 6 +-
.../catalyst/expressions/datetimeExpressions.scala | 8 +-
.../spark/sql/catalyst/expressions/hash.scala | 6 +-
.../sql/catalyst/optimizer/finishAnalysis.scala | 6 +-
.../plans/logical/EventTimeWatermark.scala | 4 +-
.../catalyst/rules/QueryExecutionMetering.scala | 4 +-
.../spark/sql/catalyst/util/DateTimeUtils.scala | 90 ++++++++++++----------
.../apache/spark/sql/catalyst/util/package.scala | 20 -----
.../spark/sql/catalyst/expressions/CastSuite.scala | 25 +++---
.../expressions/DateExpressionsSuite.scala | 17 ++--
.../scala/org/apache/spark/sql/SparkSession.scala | 3 +-
.../spark/sql/execution/DataSourceScanExec.scala | 10 ++-
.../org/apache/spark/sql/execution/SortExec.scala | 7 +-
.../execution/aggregate/HashAggregateExec.scala | 9 ++-
.../aggregate/ObjectHashAggregateExec.scala | 4 +-
.../sql/execution/basicPhysicalOperators.scala | 4 +-
.../apache/spark/sql/execution/command/ddl.scala | 3 +-
.../execution/exchange/BroadcastExchangeExec.scala | 6 +-
.../sql/execution/joins/ShuffledHashJoinExec.scala | 4 +-
.../streaming/EventTimeWatermarkExec.scala | 3 +-
.../sql/execution/streaming/FileStreamSource.scala | 5 +-
.../sql/execution/streaming/GroupStateImpl.scala | 3 +-
.../sql/execution/streaming/ProgressReporter.scala | 8 +-
.../streaming/continuous/ContinuousTrigger.scala | 2 +-
.../spark/sql/streaming/ProcessingTime.scala | 2 +-
.../org/apache/spark/sql/DateFunctionsSuite.scala | 47 ++++++-----
.../streaming/sources/TextSocketStreamSuite.scala | 4 +-
.../sql/streaming/EventTimeWatermarkSuite.scala | 9 ++-
.../org/apache/spark/sql/hive/HiveInspectors.scala | 5 +-
.../spark/sql/hive/client/HiveClientImpl.scala | 9 ++-
.../apache/spark/sql/hive/client/HiveShim.scala | 2 +-
.../streaming/util/RateLimitedOutputStream.scala | 2 +-
36 files changed, 210 insertions(+), 182 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index 6a497af..2d842b9 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -109,8 +109,8 @@ class BarrierTaskContext private[spark] (
override def run(): Unit = {
logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt
$stageAttemptNumber) waiting " +
s"under the global sync since $startTime, has been waiting for " +
- s"${(System.currentTimeMillis() - startTime) / 1000} seconds,
current barrier epoch " +
- s"is $barrierEpoch.")
+ s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)}
seconds, " +
+ s"current barrier epoch is $barrierEpoch.")
}
}
// Log the update of global sync every 60 seconds.
@@ -126,14 +126,14 @@ class BarrierTaskContext private[spark] (
barrierEpoch += 1
logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt
$stageAttemptNumber) finished " +
"global sync successfully, waited for " +
- s"${(System.currentTimeMillis() - startTime) / 1000} seconds, current
barrier epoch is " +
- s"$barrierEpoch.")
+ s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)}
seconds, " +
+ s"current barrier epoch is $barrierEpoch.")
} catch {
case e: SparkException =>
logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt
$stageAttemptNumber) failed " +
"to perform global sync, waited for " +
- s"${(System.currentTimeMillis() - startTime) / 1000} seconds,
current barrier epoch " +
- s"is $barrierEpoch.")
+ s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)}
seconds, " +
+ s"current barrier epoch is $barrierEpoch.")
throw e
} finally {
timerTask.cancel()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b26da8a..3dd804b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -994,9 +994,10 @@ private[deploy] class Master(
val toRemove = workers.filter(_.lastHeartbeat < currentTime -
workerTimeoutMs).toArray
for (worker <- toRemove) {
if (worker.state != WorkerState.DEAD) {
+ val workerTimeoutSecs =
TimeUnit.MILLISECONDS.toSeconds(workerTimeoutMs)
logWarning("Removing %s because we got no heartbeat in %d
seconds".format(
- worker.id, workerTimeoutMs / 1000))
- removeWorker(worker, s"Not receiving heartbeat for ${workerTimeoutMs /
1000} seconds")
+ worker.id, workerTimeoutSecs))
+ removeWorker(worker, s"Not receiving heartbeat for $workerTimeoutSecs
seconds")
} else {
if (worker.lastHeartbeat < currentTime - ((reaperIterations + 1) *
workerTimeoutMs)) {
workers -= worker // we've seen this DEAD worker in the UI, etc. for
long enough; cull it
diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
index bb389cd..df1ed28 100644
--- a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
+++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
@@ -148,13 +148,13 @@ private[spark] class Benchmark(
if (outputPerIteration) {
// scalastyle:off
- println(s"Iteration $i took ${runTime / 1000} microseconds")
+ println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)}
microseconds")
// scalastyle:on
}
i += 1
}
// scalastyle:off
- println(s" Stopped after $i iterations, ${runTimes.sum / 1000000} ms")
+ println(s" Stopped after $i iterations,
${NANOSECONDS.toMillis(runTimes.sum)} ms")
// scalastyle:on
val best = runTimes.min
val avg = runTimes.sum / runTimes.size
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index b20249f..d591c58 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions
import java.math.{BigDecimal => JavaBigDecimal}
+import java.util.concurrent.TimeUnit._
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
@@ -25,6 +26,7 @@ import
org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.unsafe.types.UTF8String.{IntWrapper, LongWrapper}
@@ -374,7 +376,7 @@ case class Cast(child: Expression, dataType: DataType,
timeZoneId: Option[String
case ByteType =>
buildCast[Byte](_, b => longToTimestamp(b.toLong))
case DateType =>
- buildCast[Int](_, d => DateTimeUtils.daysToMillis(d, timeZone) * 1000)
+ buildCast[Int](_, d =>
MILLISECONDS.toMicros(DateTimeUtils.daysToMillis(d, timeZone)))
// TimestampWritable.decimalToTimestamp
case DecimalType() =>
buildCast[Decimal](_, d => decimalToTimestamp(d))
@@ -387,21 +389,21 @@ case class Cast(child: Expression, dataType: DataType,
timeZoneId: Option[String
}
private[this] def decimalToTimestamp(d: Decimal): Long = {
- (d.toBigDecimal * 1000000L).longValue()
+ (d.toBigDecimal * MICROS_PER_SECOND).longValue()
}
private[this] def doubleToTimestamp(d: Double): Any = {
- if (d.isNaN || d.isInfinite) null else (d * 1000000L).toLong
+ if (d.isNaN || d.isInfinite) null else (d * MICROS_PER_SECOND).toLong
}
// converting seconds to us
- private[this] def longToTimestamp(t: Long): Long = t * 1000000L
+ private[this] def longToTimestamp(t: Long): Long = SECONDS.toMicros(t)
// converting us to seconds
private[this] def timestampToLong(ts: Long): Long = {
- Math.floorDiv(ts, DateTimeUtils.MICROS_PER_SECOND)
+ Math.floorDiv(ts, MICROS_PER_SECOND)
}
// converting us to seconds in double
private[this] def timestampToDouble(ts: Long): Double = {
- ts / 1000000.0
+ ts / MICROS_PER_SECOND.toDouble
}
// DateConverter
@@ -411,7 +413,7 @@ case class Cast(child: Expression, dataType: DataType,
timeZoneId: Option[String
case TimestampType =>
// throw valid precision more than seconds, according to Hive.
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
- buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 1000L, timeZone))
+ buildCast[Long](_, t =>
DateTimeUtils.millisToDays(MICROSECONDS.toMillis(t), timeZone))
}
// IntervalConverter
@@ -927,7 +929,8 @@ case class Cast(child: Expression, dataType: DataType,
timeZoneId: Option[String
val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone),
timeZone.getClass)
(c, evPrim, evNull) =>
code"""$evPrim =
- org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToDays($c /
1000L, $tz);"""
+ org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToDays(
+ $c / $MICROS_PER_MILLIS, $tz);"""
case _ =>
(c, evPrim, evNull) => code"$evNull = true;"
}
@@ -1034,7 +1037,8 @@ case class Cast(child: Expression, dataType: DataType,
timeZoneId: Option[String
val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone),
timeZone.getClass)
(c, evPrim, evNull) =>
code"""$evPrim =
- org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMillis($c,
$tz) * 1000;"""
+ org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMillis(
+ $c, $tz) * $MICROS_PER_MILLIS;"""
case DecimalType() =>
(c, evPrim, evNull) => code"$evPrim = ${decimalToTimestampCode(c)};"
case DoubleType =>
@@ -1043,7 +1047,7 @@ case class Cast(child: Expression, dataType: DataType,
timeZoneId: Option[String
if (Double.isNaN($c) || Double.isInfinite($c)) {
$evNull = true;
} else {
- $evPrim = (long)($c * 1000000L);
+ $evPrim = (long)($c * $MICROS_PER_SECOND);
}
"""
case FloatType =>
@@ -1052,7 +1056,7 @@ case class Cast(child: Expression, dataType: DataType,
timeZoneId: Option[String
if (Float.isNaN($c) || Float.isInfinite($c)) {
$evNull = true;
} else {
- $evPrim = (long)($c * 1000000L);
+ $evPrim = (long)($c * $MICROS_PER_SECOND);
}
"""
}
@@ -1069,14 +1073,14 @@ case class Cast(child: Expression, dataType: DataType,
timeZoneId: Option[String
}
private[this] def decimalToTimestampCode(d: ExprValue): Block = {
- val block = inline"new java.math.BigDecimal(1000000L)"
+ val block = inline"new java.math.BigDecimal($MICROS_PER_SECOND)"
code"($d.toBigDecimal().bigDecimal().multiply($block)).longValue()"
}
- private[this] def longToTimeStampCode(l: ExprValue): Block = code"$l *
1000000L"
+ private[this] def longToTimeStampCode(l: ExprValue): Block = code"$l *
(long)$MICROS_PER_SECOND"
private[this] def timestampToIntegerCode(ts: ExprValue): Block =
- code"java.lang.Math.floorDiv($ts, 1000000L)"
+ code"java.lang.Math.floorDiv($ts, $MICROS_PER_SECOND)"
private[this] def timestampToDoubleCode(ts: ExprValue): Block =
- code"$ts / 1000000.0"
+ code"$ts / (double)$MICROS_PER_SECOND"
private[this] def castToBooleanCode(from: DataType): CastFunction = from
match {
case StringType =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 7c8f7cd..b9365f0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -32,7 +32,7 @@ import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator,
InternalCompilerException, SimpleCompiler}
import org.codehaus.janino.util.ClassFile
-import org.apache.spark.{SparkEnv, TaskContext, TaskKilledException}
+import org.apache.spark.{TaskContext, TaskKilledException}
import org.apache.spark.executor.InputMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.CodegenMetrics
@@ -40,10 +40,10 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData,
MapData}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.Platform
-import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.types._
import org.apache.spark.util.{ParentClassLoader, Utils}
@@ -1372,7 +1372,7 @@ object CodeGenerator extends Logging {
val startTime = System.nanoTime()
val result = doCompile(code)
val endTime = System.nanoTime()
- def timeMs: Double = (endTime - startTime).toDouble / 1000000
+ def timeMs: Double = (endTime - startTime).toDouble /
NANOS_PER_MILLIS
CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length)
CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong)
logInfo(s"Code generated in $timeMs ms")
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index ec59502..4cb0031 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -694,7 +694,7 @@ abstract class UnixTime
$javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
if (!${ev.isNull}) {
try {
- ${ev.value} = $formatterName.parse(${eval1.value}.toString())
/ 1000000L;
+ ${ev.value} = $formatterName.parse(${eval1.value}.toString())
/ $MICROS_PER_SECOND;
} catch (java.lang.IllegalArgumentException e) {
${ev.isNull} = true;
} catch (java.text.ParseException e) {
@@ -714,7 +714,7 @@ abstract class UnixTime
s"""
try {
${ev.value} = $tf$$.MODULE$$.apply($format.toString(), $tz,
$locale)
- .parse($string.toString()) / 1000000L;
+ .parse($string.toString()) / $MICROS_PER_SECOND;
} catch (java.lang.IllegalArgumentException e) {
${ev.isNull} = true;
} catch (java.text.ParseException e) {
@@ -733,7 +733,7 @@ abstract class UnixTime
boolean ${ev.isNull} = ${eval1.isNull};
$javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
if (!${ev.isNull}) {
- ${ev.value} = ${eval1.value} / 1000000L;
+ ${ev.value} = ${eval1.value} / $MICROS_PER_SECOND;
}""")
case DateType =>
val tz = ctx.addReferenceObj("timeZone", timeZone)
@@ -744,7 +744,7 @@ abstract class UnixTime
boolean ${ev.isNull} = ${eval1.isNull};
$javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
if (!${ev.isNull}) {
- ${ev.value} = $dtu.daysToMillis(${eval1.value}, $tz) / 1000L;
+ ${ev.value} = $dtu.daysToMillis(${eval1.value}, $tz) /
$MILLIS_PER_SECOND;
}""")
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index 742a4f8..8d17b07 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.math.{BigDecimal, RoundingMode}
import java.security.{MessageDigest, NoSuchAlgorithmException}
+import java.util.concurrent.TimeUnit._
import java.util.zip.CRC32
import scala.annotation.tailrec
@@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.hash.Murmur3_x86_32
@@ -863,8 +865,8 @@ object HiveHashFunction extends InterpretedHashFunction {
* Mimics TimestampWritable.hashCode() in Hive
*/
def hashTimestamp(timestamp: Long): Long = {
- val timestampInSeconds = timestamp / 1000000
- val nanoSecondsPortion = (timestamp % 1000000) * 1000
+ val timestampInSeconds = MICROSECONDS.toSeconds(timestamp)
+ val nanoSecondsPortion = (timestamp % MICROS_PER_SECOND) * NANOS_PER_MICROS
var result = timestampInSeconds
result <<= 30 // the nanosecond part fits in 30 bits
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index fe196ec..4094864 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.optimizer
+import java.util.concurrent.TimeUnit._
+
import scala.collection.mutable
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -65,7 +67,9 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
case CurrentDate(Some(timeZoneId)) =>
currentDates.getOrElseUpdate(timeZoneId, {
Literal.create(
- DateTimeUtils.millisToDays(timestamp / 1000L,
DateTimeUtils.getTimeZone(timeZoneId)),
+ DateTimeUtils.millisToDays(
+ MICROSECONDS.toMillis(timestamp),
+ DateTimeUtils.getTimeZone(timeZoneId)),
DateType)
})
case CurrentTimestamp() => currentTime
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
index 7a927e1..8441c2c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.plans.logical
+import java.util.concurrent.TimeUnit
+
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.unsafe.types.CalendarInterval
@@ -27,7 +29,7 @@ object EventTimeWatermark {
def getDelayMs(delay: CalendarInterval): Long = {
// We define month as `31 days` to simplify calculation.
- val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+ val millisPerMonth =
TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31
delay.milliseconds + delay.months * millisPerMonth
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
index 62f7541..e4d5fa9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
@@ -21,6 +21,8 @@ import scala.collection.JavaConverters._
import com.google.common.util.concurrent.AtomicLongMap
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.NANOS_PER_SECOND
+
case class QueryExecutionMetering() {
private val timeMap = AtomicLongMap.create[String]()
private val numRunsMap = AtomicLongMap.create[String]()
@@ -82,7 +84,7 @@ case class QueryExecutionMetering() {
s"""
|=== Metrics of Analyzer/Optimizer Rules ===
|Total number of runs: $totalNumRuns
- |Total time: ${totalTime / 1000000000D} seconds
+ |Total time: ${totalTime / NANOS_PER_SECOND.toDouble} seconds
|
|$colRuleName $colRunTime $colNumRuns
|$ruleMetrics
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index d714d29..627670a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -22,7 +22,7 @@ import java.time._
import java.time.Year.isLeap
import java.time.temporal.IsoFields
import java.util.{Locale, TimeZone}
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.TimeUnit._
import scala.util.control.NonFatal
@@ -44,14 +44,18 @@ object DateTimeUtils {
// see
http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian
// it's 2440587.5, rounding up to compatible with Hive
final val JULIAN_DAY_OF_EPOCH = 2440588
- final val SECONDS_PER_DAY = 60 * 60 * 24L
- final val MICROS_PER_MILLIS = 1000L
- final val MICROS_PER_SECOND = MICROS_PER_MILLIS * MILLIS_PER_SECOND
- final val MILLIS_PER_SECOND = 1000L
- final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L
- final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY
- final val NANOS_PER_MICROS = 1000L
- final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L
+
+ final val NANOS_PER_MICROS = MICROSECONDS.toNanos(1)
+ final val NANOS_PER_MILLIS = MILLISECONDS.toNanos(1)
+ final val NANOS_PER_SECOND = SECONDS.toNanos(1)
+ final val MICROS_PER_MILLIS = MILLISECONDS.toMicros(1)
+ final val MICROS_PER_SECOND = SECONDS.toMicros(1)
+ final val MICROS_PER_DAY = DAYS.toMicros(1)
+ final val MILLIS_PER_SECOND = SECONDS.toMillis(1)
+ final val MILLIS_PER_MINUTE = MINUTES.toMillis(1)
+ final val MILLIS_PER_HOUR = HOURS.toMillis(1)
+ final val MILLIS_PER_DAY = DAYS.toMillis(1)
+ final val SECONDS_PER_DAY = DAYS.toSeconds(1)
// number of days between 1.1.1970 and 1.1.2001
final val to2001 = -11323
@@ -133,8 +137,8 @@ object DateTimeUtils {
micros += MICROS_PER_SECOND
seconds -= 1
}
- val t = new Timestamp(seconds * 1000)
- t.setNanos(micros.toInt * 1000)
+ val t = new Timestamp(SECONDS.toMillis(seconds))
+ t.setNanos(MICROSECONDS.toNanos(micros).toInt)
t
}
@@ -143,7 +147,7 @@ object DateTimeUtils {
*/
def fromJavaTimestamp(t: Timestamp): SQLTimestamp = {
if (t != null) {
- t.getTime() * 1000L + (t.getNanos().toLong / 1000) % 1000L
+ MILLISECONDS.toMicros(t.getTime()) + NANOSECONDS.toMicros(t.getNanos())
% NANOS_PER_MICROS
} else {
0L
}
@@ -156,7 +160,7 @@ object DateTimeUtils {
def fromJulianDay(day: Int, nanoseconds: Long): SQLTimestamp = {
// use Long to avoid rounding errors
val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY
- seconds * MICROS_PER_SECOND + nanoseconds / 1000L
+ SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds)
}
/**
@@ -168,7 +172,7 @@ object DateTimeUtils {
val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
val day = julian_us / MICROS_PER_DAY
val micros = julian_us % MICROS_PER_DAY
- (day.toInt, micros * 1000L)
+ (day.toInt, MICROSECONDS.toNanos(micros))
}
/*
@@ -186,7 +190,7 @@ object DateTimeUtils {
* Converts milliseconds since epoch to SQLTimestamp.
*/
def fromMillis(millis: Long): SQLTimestamp = {
- millis * MICROS_PER_MILLIS
+ MILLISECONDS.toMicros(millis)
}
/**
@@ -329,7 +333,7 @@ object DateTimeUtils {
val sign = if (tz.get.toChar == '-') -1 else 1
ZoneId.ofOffset("GMT", ZoneOffset.ofHoursMinutes(sign * segments(7),
sign * segments(8)))
}
- val nanoseconds = TimeUnit.MICROSECONDS.toNanos(segments(6))
+ val nanoseconds = MICROSECONDS.toNanos(segments(6))
val localTime = LocalTime.of(segments(3), segments(4), segments(5),
nanoseconds.toInt)
val localDate = if (justTime) {
LocalDate.now(zoneId)
@@ -346,8 +350,8 @@ object DateTimeUtils {
}
def instantToMicros(instant: Instant): Long = {
- val sec = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND)
- val result = Math.addExact(sec, instant.getNano / NANOS_PER_MICROS)
+ val us = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND)
+ val result = Math.addExact(us, NANOSECONDS.toMicros(instant.getNano))
result
}
@@ -420,14 +424,15 @@ object DateTimeUtils {
}
private def localTimestamp(microsec: SQLTimestamp, timeZone: TimeZone):
SQLTimestamp = {
- absoluteMicroSecond(microsec) + timeZone.getOffset(microsec / 1000) * 1000L
+ val zoneOffsetUs =
MILLISECONDS.toMicros(timeZone.getOffset(MICROSECONDS.toMillis(microsec)))
+ absoluteMicroSecond(microsec) + zoneOffsetUs
}
/**
* Returns the hour value of a given timestamp value. The timestamp is
expressed in microseconds.
*/
def getHours(microsec: SQLTimestamp, timeZone: TimeZone): Int = {
- ((localTimestamp(microsec, timeZone) / MICROS_PER_SECOND / 3600) %
24).toInt
+ (MICROSECONDS.toHours(localTimestamp(microsec, timeZone)) % 24).toInt
}
/**
@@ -435,7 +440,7 @@ object DateTimeUtils {
* microseconds.
*/
def getMinutes(microsec: SQLTimestamp, timeZone: TimeZone): Int = {
- ((localTimestamp(microsec, timeZone) / MICROS_PER_SECOND / 60) % 60).toInt
+ (MICROSECONDS.toMinutes(localTimestamp(microsec, timeZone)) % 60).toInt
}
/**
@@ -443,7 +448,7 @@ object DateTimeUtils {
* microseconds.
*/
def getSeconds(microsec: SQLTimestamp, timeZone: TimeZone): Int = {
- ((localTimestamp(microsec, timeZone) / MICROS_PER_SECOND) % 60).toInt
+ (MICROSECONDS.toSeconds(localTimestamp(microsec, timeZone)) % 60).toInt
}
/**
@@ -560,10 +565,10 @@ object DateTimeUtils {
months: Int,
microseconds: Long,
timeZone: TimeZone): SQLTimestamp = {
- val days = millisToDays(start / 1000L, timeZone)
+ val days = millisToDays(MICROSECONDS.toMillis(start), timeZone)
val newDays = dateAddMonths(days, months)
start +
- daysToMillis(newDays, timeZone) * 1000L - daysToMillis(days, timeZone) *
1000L +
+ MILLISECONDS.toMicros(daysToMillis(newDays, timeZone) -
daysToMillis(days, timeZone)) +
microseconds
}
@@ -582,8 +587,8 @@ object DateTimeUtils {
time2: SQLTimestamp,
roundOff: Boolean,
timeZone: TimeZone): Double = {
- val millis1 = time1 / 1000L
- val millis2 = time2 / 1000L
+ val millis1 = MICROSECONDS.toMillis(time1)
+ val millis2 = MICROSECONDS.toMillis(time2)
val date1 = millisToDays(millis1, timeZone)
val date2 = millisToDays(millis2, timeZone)
val (year1, monthInYear1, dayInMonth1, daysToMonthEnd1) = splitDate(date1)
@@ -599,12 +604,11 @@ object DateTimeUtils {
}
// using milliseconds can cause precision loss with more than 8 digits
// we follow Hive's implementation which uses seconds
- val secondsInDay1 = (millis1 - daysToMillis(date1, timeZone)) / 1000L
- val secondsInDay2 = (millis2 - daysToMillis(date2, timeZone)) / 1000L
+ val secondsInDay1 = MILLISECONDS.toSeconds(millis1 - daysToMillis(date1,
timeZone))
+ val secondsInDay2 = MILLISECONDS.toSeconds(millis2 - daysToMillis(date2,
timeZone))
val secondsDiff = (dayInMonth1 - dayInMonth2) * SECONDS_PER_DAY +
secondsInDay1 - secondsInDay2
- // 2678400D is the number of seconds in 31 days
- // every month is considered to be 31 days long in this function
- val diff = monthDiff + secondsDiff / 2678400D
+ val secondsInMonth = DAYS.toSeconds(31)
+ val diff = monthDiff + secondsDiff / secondsInMonth.toDouble
if (roundOff) {
// rounding to 8 digits
math.round(diff * 1e8) / 1e8
@@ -688,7 +692,7 @@ object DateTimeUtils {
* Trunc level should be generated using `parseTruncLevel()`, should be
between 1 and 8
*/
def truncTimestamp(t: SQLTimestamp, level: Int, timeZone: TimeZone):
SQLTimestamp = {
- var millis = t / MICROS_PER_MILLIS
+ var millis = MICROSECONDS.toMillis(t)
val truncated = level match {
case TRUNC_TO_YEAR =>
val dDays = millisToDays(millis, timeZone)
@@ -699,13 +703,13 @@ object DateTimeUtils {
case TRUNC_TO_DAY =>
val offset = timeZone.getOffset(millis)
millis += offset
- millis - millis % (MILLIS_PER_SECOND * SECONDS_PER_DAY) - offset
+ millis - millis % MILLIS_PER_DAY - offset
case TRUNC_TO_HOUR =>
val offset = timeZone.getOffset(millis)
millis += offset
- millis - millis % (60 * 60 * MILLIS_PER_SECOND) - offset
+ millis - millis % MILLIS_PER_HOUR - offset
case TRUNC_TO_MINUTE =>
- millis - millis % (60 * MILLIS_PER_SECOND)
+ millis - millis % MILLIS_PER_MINUTE
case TRUNC_TO_SECOND =>
millis - millis % MILLIS_PER_SECOND
case TRUNC_TO_WEEK =>
@@ -761,8 +765,8 @@ object DateTimeUtils {
if (guess != offset) {
// fallback to do the reverse lookup using java.time.LocalDateTime
// this should only happen near the start or end of DST
- val localDate =
LocalDate.ofEpochDay(TimeUnit.MILLISECONDS.toDays(millisLocal))
- val localTime = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(
+ val localDate = LocalDate.ofEpochDay(MILLISECONDS.toDays(millisLocal))
+ val localTime = LocalTime.ofNanoOfDay(MILLISECONDS.toNanos(
Math.floorMod(millisLocal, MILLIS_PER_DAY)))
val localDateTime = LocalDateTime.of(localDate, localTime)
val millisEpoch =
localDateTime.atZone(tz.toZoneId).toInstant.toEpochMilli
@@ -787,15 +791,19 @@ object DateTimeUtils {
ts
} else {
// get the human time using local time zone, that actually is in
fromZone.
- val localTs = ts + localZone.getOffset(ts / 1000L) * 1000L // in
fromZone
- localTs - getOffsetFromLocalMillis(localTs / 1000L, fromZone) * 1000L
+ val localZoneOffsetMs = localZone.getOffset(MICROSECONDS.toMillis(ts))
+ val localTsUs = ts + MILLISECONDS.toMicros(localZoneOffsetMs) // in
fromZone
+ val offsetFromLocalMs =
getOffsetFromLocalMillis(MICROSECONDS.toMillis(localTsUs), fromZone)
+ localTsUs - MILLISECONDS.toMicros(offsetFromLocalMs)
}
if (toZone.getID == localZone.getID) {
utcTs
} else {
- val localTs = utcTs + toZone.getOffset(utcTs / 1000L) * 1000L // in
toZone
+ val toZoneOffsetMs = toZone.getOffset(MICROSECONDS.toMillis(utcTs))
+ val localTsUs = utcTs + MILLISECONDS.toMicros(toZoneOffsetMs) // in
toZone
// treat it as local timezone, convert to UTC (we could get the expected
human time back)
- localTs - getOffsetFromLocalMillis(localTs / 1000L, localZone) * 1000L
+ val offsetFromLocalMs =
getOffsetFromLocalMillis(MICROSECONDS.toMillis(localTsUs), localZone)
+ localTsUs - MILLISECONDS.toMicros(offsetFromLocalMs)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index 7f5860e..a5dbc75 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -125,18 +125,6 @@ package object util extends Logging {
new String(out.toByteArray, StandardCharsets.UTF_8)
}
- def stringOrNull(a: AnyRef): String = if (a == null) null else a.toString
-
- def benchmark[A](f: => A): A = {
- val startTime = System.nanoTime()
- val ret = f
- val endTime = System.nanoTime()
- // scalastyle:off println
- println(s"${(endTime - startTime).toDouble / 1000000}ms")
- // scalastyle:on println
- ret
- }
-
// Replaces attributes, string literals, complex type extractors with their
pretty form so that
// generated column names don't contain back-ticks or double-quotes.
def usePrettyExpression(e: Expression): Expression = e transform {
@@ -158,7 +146,6 @@ package object util extends Logging {
def toPrettySQL(e: Expression): String = usePrettyExpression(e).sql
-
def escapeSingleQuotedString(str: String): String = {
val builder = StringBuilder.newBuilder
@@ -203,11 +190,4 @@ package object util extends Logging {
def truncatedString[T](seq: Seq[T], sep: String, maxFields: Int): String = {
truncatedString(seq, "", sep, "", maxFields)
}
-
- /* FIX ME
- implicit class debugLogging(a: Any) {
- def debugLogging() {
-
org.apache.log4j.Logger.getLogger(a.getClass.getName).setLevel(org.apache.log4j.Level.DEBUG)
- }
- } */
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
index 11956e1..d812504 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
@@ -18,9 +18,8 @@
package org.apache.spark.sql.catalyst.expressions
import java.sql.{Date, Timestamp}
-import java.util.{Calendar, Locale, TimeZone}
-
-import scala.util.Random
+import java.util.{Calendar, TimeZone}
+import java.util.concurrent.TimeUnit._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
@@ -29,7 +28,7 @@ import
org.apache.spark.sql.catalyst.analysis.TypeCoercion.numericPrecedence
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -321,13 +320,13 @@ class CastSuite extends SparkFunSuite with
ExpressionEvalHelper {
checkEvaluation(
cast(cast(new Timestamp(c.getTimeInMillis), StringType, timeZoneId),
TimestampType, timeZoneId),
- c.getTimeInMillis * 1000)
+ MILLISECONDS.toMicros(c.getTimeInMillis))
c = Calendar.getInstance(TimeZoneGMT)
c.set(2015, 10, 1, 2, 30, 0)
checkEvaluation(
cast(cast(new Timestamp(c.getTimeInMillis), StringType, timeZoneId),
TimestampType, timeZoneId),
- c.getTimeInMillis * 1000)
+ MILLISECONDS.toMicros(c.getTimeInMillis))
}
val gmtId = Option("GMT")
@@ -522,17 +521,17 @@ class CastSuite extends SparkFunSuite with
ExpressionEvalHelper {
checkEvaluation(cast(ts, FloatType), 15.003f)
checkEvaluation(cast(ts, DoubleType), 15.003)
checkEvaluation(cast(cast(tss, ShortType), TimestampType),
- DateTimeUtils.fromJavaTimestamp(ts) * 1000)
+ DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
- DateTimeUtils.fromJavaTimestamp(ts) * 1000)
+ DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
checkEvaluation(cast(cast(tss, LongType), TimestampType),
- DateTimeUtils.fromJavaTimestamp(ts) * 1000)
+ DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
checkEvaluation(
- cast(cast(millis.toFloat / 1000, TimestampType), FloatType),
- millis.toFloat / 1000)
+ cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), FloatType),
+ millis.toFloat / MILLIS_PER_SECOND)
checkEvaluation(
- cast(cast(millis.toDouble / 1000, TimestampType), DoubleType),
- millis.toDouble / 1000)
+ cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType),
DoubleType),
+ millis.toDouble / MILLIS_PER_SECOND)
checkEvaluation(
cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT),
Decimal(1))
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index ce576ec..8bec32d 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -21,6 +21,7 @@ import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.TimeUnit
+import java.util.concurrent.TimeUnit._
import org.apache.spark.SparkFunSuite
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
@@ -705,14 +706,14 @@ class DateExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper {
1000L)
checkEvaluation(
UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"),
timeZoneId),
- DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) /
1000L)
+
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1),
tz)))
checkEvaluation(
UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))),
Literal(fmt2), timeZoneId),
-1000L)
checkEvaluation(UnixTimestamp(
Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3),
timeZoneId),
- DateTimeUtils.daysToMillis(
- DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz) /
1000L)
+ MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
+ DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz)))
val t1 = UnixTimestamp(
CurrentTimestamp(), Literal("yyyy-MM-dd
HH:mm:ss")).eval().asInstanceOf[Long]
val t2 = UnixTimestamp(
@@ -727,7 +728,7 @@ class DateExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper {
null)
checkEvaluation(
UnixTimestamp(Literal(date1), Literal.create(null, StringType),
timeZoneId),
- DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) /
1000L)
+
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1),
tz)))
checkEvaluation(
UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"),
timeZoneId), null)
}
@@ -759,14 +760,14 @@ class DateExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper {
1000L)
checkEvaluation(
ToUnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"),
timeZoneId),
- DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) /
1000L)
+
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1),
tz)))
checkEvaluation(
ToUnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))),
Literal(fmt2), timeZoneId),
-1000L)
checkEvaluation(ToUnixTimestamp(
Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3),
timeZoneId),
- DateTimeUtils.daysToMillis(
- DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz) /
1000L)
+ MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
+ DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz)))
val t1 = ToUnixTimestamp(
CurrentTimestamp(), Literal("yyyy-MM-dd
HH:mm:ss")).eval().asInstanceOf[Long]
val t2 = ToUnixTimestamp(
@@ -780,7 +781,7 @@ class DateExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper {
null)
checkEvaluation(ToUnixTimestamp(
Literal(date1), Literal.create(null, StringType), timeZoneId),
- DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) /
1000L)
+
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1),
tz)))
checkEvaluation(
ToUnixTimestamp(Literal("2015-07-24"), Literal("not a valid
format"), timeZoneId), null)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a7bd2ef..f6fab76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import java.io.Closeable
+import java.util.concurrent.TimeUnit._
import java.util.concurrent.atomic.AtomicReference
import scala.collection.JavaConverters._
@@ -690,7 +691,7 @@ class SparkSession private(
val ret = f
val end = System.nanoTime()
// scalastyle:off println
- println(s"Time taken: ${(end - start) / 1000 / 1000} ms")
+ println(s"Time taken: ${NANOSECONDS.toMillis(end - start)} ms")
// scalastyle:on println
ret
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index f852a52..3aed2ce 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -17,17 +17,18 @@
package org.apache.spark.sql.execution
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import java.util.concurrent.TimeUnit._
+
+import scala.collection.mutable.HashMap
import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus,
Path}
+import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
@@ -180,7 +181,8 @@ case class FileSourceScanExec(
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
driverMetrics("numFiles") = ret.map(_.files.size.toLong).sum
- val timeTakenMs = ((System.nanoTime() - startTime) +
optimizerMetadataTimeNs) / 1000 / 1000
+ val timeTakenMs = NANOSECONDS.toMillis(
+ (System.nanoTime() - startTime) + optimizerMetadataTimeNs)
driverMetrics("metadataTime") = timeTakenMs
ret
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index f1470e4..0a955d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution
+import java.util.concurrent.TimeUnit._
+
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.RDD
@@ -24,6 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.execution.metric.SQLMetrics
/**
@@ -106,7 +109,7 @@ case class SortExec(
// figure out how many bytes we spilled for this operator.
val spillSizeBefore = metrics.memoryBytesSpilled
val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
- sortTime += sorter.getSortTimeNanos / 1000000
+ sortTime += NANOSECONDS.toMillis(sorter.getSortTimeNanos)
peakMemory += sorter.getPeakMemoryUsage
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
@@ -157,7 +160,7 @@ case class SortExec(
| long $spillSizeBefore = $metrics.memoryBytesSpilled();
| $addToSorterFuncName();
| $sortedIterator = $sorterVariable.sort();
- | $sortTime.add($sorterVariable.getSortTimeNanos() / 1000000);
+ | $sortTime.add($sorterVariable.getSortTimeNanos() /
$NANOS_PER_MILLIS);
| $peakMemory.add($sorterVariable.getPeakMemoryUsage());
| $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
|
$metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 23ae1f0..25ff658 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.aggregate
+import java.util.concurrent.TimeUnit._
+
import org.apache.spark.TaskContext
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.rdd.RDD
@@ -28,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -135,7 +138,7 @@ case class HashAggregateExec(
aggregationIterator
}
}
- aggTime += (System.nanoTime() - beforeAgg) / 1000000
+ aggTime += NANOSECONDS.toMillis(System.nanoTime() - beforeAgg)
res
}
}
@@ -240,7 +243,7 @@ case class HashAggregateExec(
| $initAgg = true;
| long $beforeAgg = System.nanoTime();
| $doAggFuncName();
- | $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
+ | $aggTime.add((System.nanoTime() - $beforeAgg) / $NANOS_PER_MILLIS);
|
| // output the result
| ${genResult.trim}
@@ -726,7 +729,7 @@ case class HashAggregateExec(
$initAgg = true;
long $beforeAgg = System.nanoTime();
$doAggFuncName();
- $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
+ $aggTime.add((System.nanoTime() - $beforeAgg) / $NANOS_PER_MILLIS);
}
// output the result
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
index 5b340ee..151da24 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.aggregate
+import java.util.concurrent.TimeUnit._
+
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
@@ -132,7 +134,7 @@ case class ObjectHashAggregateExec(
aggregationIterator
}
}
- aggTime += (System.nanoTime() - beforeAgg) / 1000000
+ aggTime += NANOSECONDS.toMillis(System.nanoTime() - beforeAgg)
res
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 4352721..eacd35b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution
+import java.util.concurrent.TimeUnit._
+
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
@@ -684,7 +686,7 @@ case class SubqueryExec(name: String, child: SparkPlan)
extends UnaryExecNode {
// Note that we use .executeCollect() because we don't want to convert
data to Scala types
val rows: Array[InternalRow] = child.executeCollect()
val beforeBuild = System.nanoTime()
- longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
+ longMetric("collectTime") += NANOSECONDS.toMillis(beforeBuild -
beforeCollect)
val dataSize =
rows.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
longMetric("dataSize") += dataSize
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 096481f..bcd8908 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.command
import java.util.Locale
+import java.util.concurrent.TimeUnit._
import scala.collection.{GenMap, GenSeq}
import scala.collection.parallel.ForkJoinTaskSupport
@@ -739,7 +740,7 @@ case class AlterTableRecoverPartitionsCommand(
// do this in parallel.
val batchSize = 100
partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch =>
- val now = System.currentTimeMillis() / 1000
+ val now = MILLISECONDS.toSeconds(System.currentTimeMillis())
val parts = batch.map { case (spec, location) =>
val params = partitionStats.get(location.toString).map {
case PartitionStatistics(numFiles, totalSize) =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index d55d4fa..b9972b8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -83,7 +83,7 @@ case class BroadcastExchangeExec(
}
val beforeBuild = System.nanoTime()
- longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
+ longMetric("collectTime") += NANOSECONDS.toMillis(beforeBuild -
beforeCollect)
// Construct the relation.
val relation = mode.transform(input, Some(numRows))
@@ -105,11 +105,11 @@ case class BroadcastExchangeExec(
}
val beforeBroadcast = System.nanoTime()
- longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
+ longMetric("buildTime") += NANOSECONDS.toMillis(beforeBroadcast -
beforeBuild)
// Broadcast the relation
val broadcasted = sparkContext.broadcast(relation)
- longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast)
/ 1000000
+ longMetric("broadcastTime") +=
NANOSECONDS.toMillis(System.nanoTime() - beforeBroadcast)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
metrics.values.toSeq)
broadcasted
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index 524804d..a8361fd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.joins
+import java.util.concurrent.TimeUnit._
+
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -53,7 +55,7 @@ case class ShuffledHashJoinExec(
val start = System.nanoTime()
val context = TaskContext.get()
val relation = HashedRelation(iter, buildKeys, taskMemoryManager =
context.taskMemoryManager())
- buildTime += (System.nanoTime() - start) / 1000000
+ buildTime += NANOSECONDS.toMillis(System.nanoTime() - start)
buildDataSize += relation.estimatedSize
// This relation is usually used until the end of task.
context.addTaskCompletionListener[Unit](_ => relation.close())
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index 6fa7ee0..6d1131e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.unsafe.types.CalendarInterval
@@ -99,7 +100,7 @@ case class EventTimeWatermarkExec(
child.execute().mapPartitions { iter =>
val getEventTime = UnsafeProjection.create(eventTime :: Nil,
child.output)
iter.map { row =>
- eventTimeStats.add(getEventTime(row).getLong(0) / 1000)
+ eventTimeStats.add(getEventTime(row).getLong(0) / MICROS_PER_MILLIS)
row
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 43b70ae..cef814b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -18,8 +18,7 @@
package org.apache.spark.sql.execution.streaming
import java.net.URI
-
-import scala.collection.JavaConverters._
+import java.util.concurrent.TimeUnit._
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -237,7 +236,7 @@ class FileStreamSource(
(status.getPath.toUri.toString, status.getModificationTime)
}
val endTime = System.nanoTime
- val listingTimeMs = (endTime.toDouble - startTime) / 1000000
+ val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime)
if (listingTimeMs > 2000) {
// Output a warning when listing files uses more than 2 seconds.
logWarning(s"Listed ${files.size} file(s) in $listingTimeMs ms")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
index 7f65e3e..fcb230b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.streaming
import java.sql.Date
+import java.util.concurrent.TimeUnit
import org.apache.commons.lang3.StringUtils
@@ -178,7 +179,7 @@ private[sql] class GroupStateImpl[S] private(
throw new IllegalArgumentException(s"Provided duration ($duration) is
not positive")
}
- val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+ val millisPerMonth =
TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31
cal.milliseconds + cal.months * millisPerMonth
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 2528351..859c327 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark,
LogicalPlan}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec,
StreamingDataSourceV2Relation, StreamWriterCommitProgress}
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream
@@ -87,7 +87,7 @@ trait ProgressReporter extends Logging {
private var lastNoDataProgressEventTime = Long.MinValue
private val timestampFormat = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
- timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+ timestampFormat.setTimeZone(getTimeZone("UTC"))
@volatile
protected var currentStatus: StreamingQueryStatus = {
@@ -147,10 +147,10 @@ trait ProgressReporter extends Logging {
val executionStats = extractExecutionStats(hasNewData)
val processingTimeSec =
- (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble /
1000
+ (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble /
MILLIS_PER_SECOND
val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
- (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble /
1000
+ (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble /
MILLIS_PER_SECOND
} else {
Double.NaN
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
index caffcc3..fd0ff31 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
@@ -53,7 +53,7 @@ private[sql] object ContinuousTrigger {
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year
interval: $interval")
}
- new ContinuousTrigger(cal.microseconds / 1000)
+ new ContinuousTrigger(TimeUnit.MICROSECONDS.toMillis(cal.microseconds))
}
def apply(interval: Duration): ContinuousTrigger = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
index 236bd55..38b0776 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
@@ -91,7 +91,7 @@ object ProcessingTime {
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year
interval: $interval")
}
- new ProcessingTime(cal.microseconds / 1000)
+ new ProcessingTime(TimeUnit.MICROSECONDS.toMillis(cal.microseconds))
}
/**
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 62bb72d..b06d52d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.Locale
+import java.util.concurrent.TimeUnit
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions._
@@ -515,6 +516,8 @@ class DateFunctionsSuite extends QueryTest with
SharedSQLContext {
Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new
Timestamp(-1000000)))))
}
+ private def secs(millis: Long): Long =
TimeUnit.MILLISECONDS.toSeconds(millis)
+
test("unix_timestamp") {
val date1 = Date.valueOf("2015-07-24")
val date2 = Date.valueOf("2015-07-25")
@@ -527,21 +530,21 @@ class DateFunctionsSuite extends QueryTest with
SharedSQLContext {
val fmt = "yyyy/MM/dd HH:mm:ss.S"
val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts",
"s", "ss")
checkAnswer(df.select(unix_timestamp(col("ts"))), Seq(
- Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+ Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
checkAnswer(df.select(unix_timestamp(col("ss"))), Seq(
- Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+ Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
checkAnswer(df.select(unix_timestamp(col("d"), fmt)), Seq(
- Row(date1.getTime / 1000L), Row(date2.getTime / 1000L)))
+ Row(secs(date1.getTime)), Row(secs(date2.getTime))))
checkAnswer(df.select(unix_timestamp(col("s"), fmt)), Seq(
- Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+ Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
checkAnswer(df.selectExpr("unix_timestamp(ts)"), Seq(
- Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+ Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
checkAnswer(df.selectExpr("unix_timestamp(ss)"), Seq(
- Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+ Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
checkAnswer(df.selectExpr(s"unix_timestamp(d, '$fmt')"), Seq(
- Row(date1.getTime / 1000L), Row(date2.getTime / 1000L)))
+ Row(secs(date1.getTime)), Row(secs(date2.getTime))))
checkAnswer(df.selectExpr(s"unix_timestamp(s, '$fmt')"), Seq(
- Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+ Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
val x1 = "2015-07-24 10:00:00"
val x2 = "2015-25-07 02:02:02"
@@ -552,13 +555,13 @@ class DateFunctionsSuite extends QueryTest with
SharedSQLContext {
val df1 = Seq(x1, x2, x3, x4).toDF("x")
checkAnswer(df1.select(unix_timestamp(col("x"))), Seq(
- Row(ts1.getTime / 1000L), Row(null), Row(null), Row(null)))
+ Row(secs(ts1.getTime)), Row(null), Row(null), Row(null)))
checkAnswer(df1.selectExpr("unix_timestamp(x)"), Seq(
- Row(ts1.getTime / 1000L), Row(null), Row(null), Row(null)))
+ Row(secs(ts1.getTime)), Row(null), Row(null), Row(null)))
checkAnswer(df1.select(unix_timestamp(col("x"), "yyyy-dd-MM HH:mm:ss")),
Seq(
- Row(null), Row(ts2.getTime / 1000L), Row(null), Row(null)))
+ Row(null), Row(secs(ts2.getTime)), Row(null), Row(null)))
checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"),
Seq(
- Row(ts4.getTime / 1000L), Row(null), Row(ts3.getTime / 1000L),
Row(null)))
+ Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null)))
// invalid format
checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd aa:HH:ss')"),
Seq(
@@ -570,10 +573,12 @@ class DateFunctionsSuite extends QueryTest with
SharedSQLContext {
val ts5 = Timestamp.valueOf("2016-02-29 00:00:00")
val df2 = Seq(y1, y2).toDF("y")
checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq(
- Row(ts5.getTime / 1000L), Row(null)))
+ Row(secs(ts5.getTime)), Row(null)))
val now = sql("select unix_timestamp()").collect().head.getLong(0)
- checkAnswer(sql(s"select cast ($now as timestamp)"), Row(new
java.util.Date(now * 1000)))
+ checkAnswer(
+ sql(s"select cast ($now as timestamp)"),
+ Row(new java.util.Date(TimeUnit.SECONDS.toMillis(now))))
}
test("to_unix_timestamp") {
@@ -588,13 +593,13 @@ class DateFunctionsSuite extends QueryTest with
SharedSQLContext {
val fmt = "yyyy/MM/dd HH:mm:ss.S"
val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts",
"s", "ss")
checkAnswer(df.selectExpr("to_unix_timestamp(ts)"), Seq(
- Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+ Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
checkAnswer(df.selectExpr("to_unix_timestamp(ss)"), Seq(
- Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+ Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
checkAnswer(df.selectExpr(s"to_unix_timestamp(d, '$fmt')"), Seq(
- Row(date1.getTime / 1000L), Row(date2.getTime / 1000L)))
+ Row(secs(date1.getTime)), Row(secs(date2.getTime))))
checkAnswer(df.selectExpr(s"to_unix_timestamp(s, '$fmt')"), Seq(
- Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+ Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
val x1 = "2015-07-24 10:00:00"
val x2 = "2015-25-07 02:02:02"
@@ -605,9 +610,9 @@ class DateFunctionsSuite extends QueryTest with
SharedSQLContext {
val df1 = Seq(x1, x2, x3, x4).toDF("x")
checkAnswer(df1.selectExpr("to_unix_timestamp(x)"), Seq(
- Row(ts1.getTime / 1000L), Row(null), Row(null), Row(null)))
+ Row(secs(ts1.getTime)), Row(null), Row(null), Row(null)))
checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd
mm:HH:ss')"), Seq(
- Row(ts4.getTime / 1000L), Row(null), Row(ts3.getTime / 1000L),
Row(null)))
+ Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null)))
// february
val y1 = "2016-02-29"
@@ -615,7 +620,7 @@ class DateFunctionsSuite extends QueryTest with
SharedSQLContext {
val ts5 = Timestamp.valueOf("2016-02-29 00:00:00")
val df2 = Seq(y1, y2).toDF("y")
checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq(
- Row(ts5.getTime / 1000L), Row(null)))
+ Row(secs(ts5.getTime)), Row(null)))
// invalid format
checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd
bb:HH:ss')"), Seq(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index 33c65d7..e1769fb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import java.nio.channels.ServerSocketChannel
import java.sql.Timestamp
import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.TimeUnit._
import scala.collection.JavaConverters._
@@ -29,7 +30,6 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.DataSource
import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
@@ -168,7 +168,7 @@ class TextSocketStreamSuite extends StreamTest with
SharedSQLContext with Before
// Timestamp for rate stream is round to second which leads to
milliseconds lost, that will
// make batch1stamp smaller than current timestamp if both of them are
in the same second.
// Comparing by second to make sure the correct behavior.
- assert(batch1Stamp.getTime >= curr / 1000 * 1000)
+ assert(batch1Stamp.getTime >=
SECONDS.toMillis(MILLISECONDS.toSeconds(curr)))
assert(!batch2Stamp.before(batch1Stamp))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index b79770a..1ff9dec 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -21,6 +21,7 @@ import java.{util => ju}
import java.io.File
import java.text.SimpleDateFormat
import java.util.{Calendar, Date, Locale}
+import java.util.concurrent.TimeUnit._
import org.apache.commons.io.FileUtils
import org.scalatest.{BeforeAndAfter, Matchers}
@@ -28,7 +29,6 @@ import org.scalatest.{BeforeAndAfter, Matchers}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Dataset}
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.internal.SQLConf
@@ -347,12 +347,13 @@ class EventTimeWatermarkSuite extends StreamTest with
BeforeAndAfter with Matche
}
testStream(aggWithWatermark)(
- AddData(input, currentTimeMs / 1000),
+ AddData(input, MILLISECONDS.toSeconds(currentTimeMs)),
CheckAnswer(),
- AddData(input, currentTimeMs / 1000),
+ AddData(input, MILLISECONDS.toSeconds(currentTimeMs)),
CheckAnswer(),
assertEventStats { e =>
- assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs
/ 1000) * 1000)
+ assert(timestampFormat.parse(e.get("max")).getTime ===
+ SECONDS.toMillis(MILLISECONDS.toSeconds((currentTimeMs))))
val watermarkTime = timestampFormat.parse(e.get("watermark"))
val monthDiff = monthsSinceEpoch(currentTime) -
monthsSinceEpoch(watermarkTime)
// monthsSinceEpoch is like `math.floor(num)`, so monthDiff has two
possible values.
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 4dec2f7..178fced 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive
import java.lang.reflect.{ParameterizedType, Type, WildcardType}
+import java.util.concurrent.TimeUnit._
import scala.collection.JavaConverters._
@@ -460,7 +461,7 @@ private[hive] trait HiveInspectors {
_ => constant
case poi: WritableConstantTimestampObjectInspector =>
val t = poi.getWritableConstantValue
- val constant = t.getSeconds * 1000000L + t.getNanos / 1000L
+ val constant = SECONDS.toMicros(t.getSeconds) +
NANOSECONDS.toMicros(t.getNanos)
_ => constant
case poi: WritableConstantIntObjectInspector =>
val constant = poi.getWritableConstantValue.get()
@@ -629,7 +630,7 @@ private[hive] trait HiveInspectors {
data: Any => {
if (data != null) {
val t = x.getPrimitiveWritableObject(data)
- t.getSeconds * 1000000L + t.getNanos / 1000L
+ SECONDS.toMicros(t.getSeconds) + NANOSECONDS.toMicros(t.getNanos)
} else {
null
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index bfe19c2..77ac606 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.{File, PrintStream}
import java.lang.{Iterable => JIterable}
import java.util.{Locale, Map => JMap}
+import java.util.concurrent.TimeUnit._
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -948,8 +949,8 @@ private[hive] object HiveClientImpl {
hiveTable.setFields(schema.asJava)
hiveTable.setPartCols(partCols.asJava)
userName.foreach(hiveTable.setOwner)
- hiveTable.setCreateTime((table.createTime / 1000).toInt)
- hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
+ hiveTable.setCreateTime(MILLISECONDS.toSeconds(table.createTime).toInt)
+
hiveTable.setLastAccessTime(MILLISECONDS.toSeconds(table.lastAccessTime).toInt)
table.storage.locationUri.map(CatalogUtils.URIToString).foreach { loc =>
hiveTable.getTTable.getSd.setLocation(loc)}
table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
@@ -1012,8 +1013,8 @@ private[hive] object HiveClientImpl {
tpart.setTableName(ht.getTableName)
tpart.setValues(partValues.asJava)
tpart.setSd(storageDesc)
- tpart.setCreateTime((p.createTime / 1000).toInt)
- tpart.setLastAccessTime((p.lastAccessTime / 1000).toInt)
+ tpart.setCreateTime(MILLISECONDS.toSeconds(p.createTime).toInt)
+ tpart.setLastAccessTime(MILLISECONDS.toSeconds(p.lastAccessTime).toInt)
tpart.setParameters(mutable.Map(p.parameters.toSeq: _*).asJava)
new HivePartition(ht, tpart)
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index a8ebb23..af5ea59 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -516,7 +516,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
f.className,
null,
PrincipalType.USER,
- (System.currentTimeMillis / 1000).toInt,
+ TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis).toInt,
FunctionType.JAVA,
resourceUris.asJava)
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
index 29cc1fa..342f20f 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
@@ -79,7 +79,7 @@ class RateLimitedOutputStream(out: OutputStream,
desiredBytesPerSec: Int)
} else {
// Calculate how much time we should sleep to bring ourselves to the
desired rate.
val targetTimeInMillis = bytesWrittenSinceSync * 1000 /
desiredBytesPerSec
- val elapsedTimeInMillis = elapsedNanosecs / 1000000
+ val elapsedTimeInMillis = NANOSECONDS.toMillis(elapsedNanosecs)
val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
if (sleepTimeInMillis > 0) {
logTrace("Natural rate is " + rate + " per second but desired rate is
" +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]