This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d0d4419488 [SparkConnector] Add proper time column support for Spark 
connector segment writer (#14556)
d0d4419488 is described below

commit d0d4419488d519ab237da4467e9e290ffb0717aa
Author: Caner Balci <[email protected]>
AuthorDate: Mon Dec 2 16:42:53 2024 -0800

    [SparkConnector] Add proper time column support for Spark connector segment 
writer (#14556)
---
 .../spark/v3/datasource/PinotDataWriter.scala      | 38 +++++++++++++-
 .../connector/spark/v3/datasource/PinotWrite.scala |  3 +-
 .../v3/datasource/SparkToPinotTypeTranslator.scala | 22 +++++---
 .../ExampleSparkPinotConnectorWriteTest.scala      |  2 +
 .../spark/v3/datasource/PinotDataWriterTest.scala  | 60 ++++++++++++++++------
 .../spark/v3/datasource/PinotWriteTest.scala       | 19 +++++--
 .../SparkToPinotTypeTranslatorTest.scala           | 20 +++++++-
 .../spark/common/PinotDataSourceWriteOptions.scala | 16 ++++++
 .../common/PinotDataSourceWriteOptionsTest.scala   |  8 ++-
 9 files changed, 154 insertions(+), 34 deletions(-)

diff --git 
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
 
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
index 5a6a2a5ae3..41f86e4d2d 100644
--- 
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
+++ 
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
@@ -62,8 +62,29 @@ class PinotDataWriter[InternalRow](
   private[pinot] val savePath: String = writeOptions.savePath
   private[pinot] val bufferedRecordReader: PinotBufferedRecordReader = new 
PinotBufferedRecordReader()
 
+  private val timeColumnName = writeOptions.timeColumnName
+  private val timeColumnIndex = if (timeColumnName != null) 
writeSchema.fieldIndex(timeColumnName) else -1
+
+  private var isTimeColumnNumeric = false
+  if (timeColumnIndex > -1) {
+    isTimeColumnNumeric = writeSchema.fields(timeColumnIndex).dataType match {
+      case org.apache.spark.sql.types.IntegerType => true
+      case org.apache.spark.sql.types.LongType => true
+      case _ => false
+    }
+  }
+  private var startTime = Long.MaxValue
+  private var endTime = 0L
+
   override def write(record: catalyst.InternalRow): Unit = {
     bufferedRecordReader.write(internalRowToGenericRow(record))
+
+    // Tracking startTime and endTime for segment name generation purposes
+    if (timeColumnIndex > -1 && isTimeColumnNumeric) {
+      val time = record.getLong(timeColumnIndex)
+      startTime = Math.min(startTime, time)
+      endTime = Math.max(endTime, time)
+    }
   }
 
   override def commit(): WriterCommitMessage = {
@@ -77,7 +98,10 @@ class PinotDataWriter[InternalRow](
   /** This method is used to generate the segment name based on the format
    *  provided in the write options (segmentNameFormat).
    *  The format can contain variables like {partitionId}.
-   *  Currently supported variables are `partitionId`, `table`
+   *  Currently supported variables are `partitionId`, `table`, `startTime` 
and `endTime`
+   *
+   *  `startTime` and `endTime` are the minimum and maximum values of the time 
column in the records
+   *  and it is only available if the time column is numeric.
    *
    *  It also supports the following, python inspired format specifier for 
digit formatting:
    *  `{partitionId:05}`
@@ -88,12 +112,15 @@ class PinotDataWriter[InternalRow](
    *    "{partitionId:05}_{table}" -> "00012_airlineStats"
    *    "{table}_{partitionId}" -> "airlineStats_12"
    *    "{table}_20240805" -> "airlineStats_20240805"
+   *    "{table}_{startTime}_{endTime}_{partitionId:03}" -> 
"airlineStats_1234567890_1234567891_012"
    */
   private[pinot] def getSegmentName: String = {
     val format = writeOptions.segmentNameFormat
     val variables = Map(
       "partitionId" -> partitionId,
       "table" -> tableName,
+      "startTime" -> startTime,
+      "endTime" -> endTime,
     )
 
     val pattern = Pattern.compile("\\{(\\w+)(?::(\\d+))?}")
@@ -145,11 +172,14 @@ class PinotDataWriter[InternalRow](
                                          indexingConfig: IndexingConfig,
                                          outputDir: File,
                                         ): SegmentGeneratorConfig = {
+    val segmentsValidationAndRetentionConfig = new 
SegmentsValidationAndRetentionConfig()
+    segmentsValidationAndRetentionConfig.setTimeColumnName(timeColumnName)
+
     // Mostly dummy tableConfig, sufficient for segment generation purposes
     val tableConfig = new TableConfig(
       tableName,
       "OFFLINE",
-      new SegmentsValidationAndRetentionConfig(),
+      segmentsValidationAndRetentionConfig,
       new TenantConfig(null, null, null),
       indexingConfig,
       new TableCustomConfig(null),
@@ -192,6 +222,8 @@ class PinotDataWriter[InternalRow](
           gr.putValue(field.name, record.getBoolean(idx))
         case org.apache.spark.sql.types.ByteType =>
           gr.putValue(field.name, record.getByte(idx))
+        case org.apache.spark.sql.types.BinaryType =>
+          gr.putValue(field.name, record.getBinary(idx))
         case org.apache.spark.sql.types.ShortType =>
           gr.putValue(field.name, record.getShort(idx))
         case org.apache.spark.sql.types.ArrayType(elementType, _) =>
@@ -210,6 +242,8 @@ class PinotDataWriter[InternalRow](
               gr.putValue(field.name, 
record.getArray(idx).array.map(_.asInstanceOf[Boolean]))
             case org.apache.spark.sql.types.ByteType =>
               gr.putValue(field.name, 
record.getArray(idx).array.map(_.asInstanceOf[Byte]))
+            case org.apache.spark.sql.types.BinaryType =>
+              gr.putValue(field.name, 
record.getArray(idx).array.map(_.asInstanceOf[Array[Byte]]))
             case org.apache.spark.sql.types.ShortType =>
               gr.putValue(field.name, 
record.getArray(idx).array.map(_.asInstanceOf[Short]))
             case _ =>
diff --git 
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWrite.scala
 
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWrite.scala
index 0c0e441686..678da35f5e 100644
--- 
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWrite.scala
+++ 
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWrite.scala
@@ -28,7 +28,8 @@ class PinotWrite(
                 ) extends Write with BatchWrite {
   private[pinot] val writeOptions: PinotDataSourceWriteOptions = 
PinotDataSourceWriteOptions.from(logicalWriteInfo.options())
   private[pinot] val writeSchema: StructType = logicalWriteInfo.schema()
-  private[pinot] val pinotSchema: Schema = 
SparkToPinotTypeTranslator.translate(writeSchema, writeOptions.tableName)
+  private[pinot] val pinotSchema: Schema = 
SparkToPinotTypeTranslator.translate(
+    writeSchema, writeOptions.tableName, writeOptions.timeColumnName, 
writeOptions.timeFormat, writeOptions.timeGranularity)
 
   override def createBatchWriterFactory(physicalWriteInfo: PhysicalWriteInfo): 
DataWriterFactory = {
     // capture the values to allow lambda serialization
diff --git 
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala
 
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala
index 5584a02334..a26f923f94 100644
--- 
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala
+++ 
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala
@@ -18,26 +18,32 @@
  */
 package org.apache.pinot.connector.spark.v3.datasource
 
-import org.apache.pinot.spi.data.FieldSpec
-import org.apache.pinot.spi.data.Schema
+import org.apache.pinot.spi.data.{FieldSpec, Schema}
 import org.apache.pinot.spi.data.Schema.SchemaBuilder
 import org.apache.spark.sql.types._
 
 
 object SparkToPinotTypeTranslator {
-  // TODO: incorporate time column
-  def translate(sparkSchema: StructType, tableName: String): Schema = {
+  def translate(sparkSchema: StructType,
+                tableName: String,
+                timeColumn: String,
+                timeFormat: String,
+                timeGranularity: String): Schema = {
     val schemaBuilder = new SchemaBuilder
     schemaBuilder.setSchemaName(tableName)
     for (field <- sparkSchema.fields) {
       val fieldName = field.name
       val sparkType = field.dataType
       val pinotType = translateType(sparkType)
+
       if (pinotType != null) {
-        if (sparkType.isInstanceOf[ArrayType]) {
-          schemaBuilder.addMultiValueDimension(fieldName, pinotType)
-        } else {
-          schemaBuilder.addSingleValueDimension(fieldName, pinotType)
+        (fieldName, sparkType) match {
+          case (`timeColumn`, _) =>
+            schemaBuilder.addDateTime(fieldName, pinotType, timeFormat, 
timeGranularity);
+          case (_, _: ArrayType) =>
+            schemaBuilder.addMultiValueDimension(fieldName, pinotType)
+          case _ =>
+            schemaBuilder.addSingleValueDimension(fieldName, pinotType)
         }
       }
       else throw new UnsupportedOperationException("Unsupported data type: " + 
sparkType)
diff --git 
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorWriteTest.scala
 
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorWriteTest.scala
index 45343a8999..64582c9df8 100644
--- 
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorWriteTest.scala
+++ 
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorWriteTest.scala
@@ -55,6 +55,8 @@ object ExampleSparkPinotConnectorWriteTest extends Logging {
       .option("noDictionaryColumns", "airport,state")
       .option("bloomFilterColumns", "airport")
       .option("timeColumnName", "ts")
+      .option("timeFormat", "EPOCH|SECONDS")
+      .option("timeGranularity", "1:SECONDS")
       .save("myPath")
   }
 }
diff --git 
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriterTest.scala
 
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriterTest.scala
index 33149ef952..1653785456 100644
--- 
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriterTest.scala
+++ 
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriterTest.scala
@@ -20,7 +20,7 @@ package org.apache.pinot.connector.spark.v3.datasource
 
 import org.apache.pinot.connector.spark.common.PinotDataSourceWriteOptions
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType, BinaryType}
 import org.apache.spark.sql.connector.write.WriterCommitMessage
 import org.scalatest.matchers.should.Matchers
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -55,6 +55,8 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers 
with BeforeAndAfter
       tableName = "testTable",
       savePath = "/tmp/pinot",
       timeColumnName = "ts",
+      timeFormat = "EPOCH|SECONDS",
+      timeGranularity = "1:SECONDS",
       segmentNameFormat = "{table}_{partitionId:03}",
       invertedIndexColumns = Array("name"),
       noDictionaryColumns = Array("age"),
@@ -63,14 +65,18 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers 
with BeforeAndAfter
     )
     val writeSchema = StructType(Seq(
       StructField("name", StringType, nullable = false),
-      StructField("age", IntegerType, nullable = false)
+      StructField("age", IntegerType, nullable = false),
+      StructField("ts", LongType, nullable = false),
+      StructField("bin", BinaryType, nullable = false),
     ))
 
-    val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema, 
writeOptions.tableName)
+    val pinotSchema = SparkToPinotTypeTranslator.translate(
+      writeSchema, writeOptions.tableName, writeOptions.timeColumnName,
+      writeOptions.timeFormat, writeOptions.timeGranularity)
     val writer = new PinotDataWriter[InternalRow](0, 0, writeOptions, 
writeSchema, pinotSchema)
 
-    val record1 = new TestInternalRow(Array[Any]("Alice", 30))
-    val record2 = new TestInternalRow(Array[Any]("Bob", 25))
+    val record1 = new TestInternalRow(Array[Any]("Alice", 30, 1234567890L, 
"Alice".getBytes))
+    val record2 = new TestInternalRow(Array[Any]("Bob", 25, 1234567891L, 
"Bob".getBytes))
 
     writer.write(record1)
     writer.write(record2)
@@ -92,7 +98,9 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers 
with BeforeAndAfter
       tableName = "testTable",
       savePath = tmpDir.getAbsolutePath,
       timeColumnName = "ts",
-      segmentNameFormat = "{table}_{partitionId:03}",
+      timeFormat = "EPOCH|SECONDS",
+      timeGranularity = "1:SECONDS",
+      segmentNameFormat = "{table}_{startTime}_{endTime}_{partitionId:03}",
       invertedIndexColumns = Array("name"),
       noDictionaryColumns = Array("age"),
       bloomFilterColumns = Array("name"),
@@ -100,26 +108,32 @@ class PinotDataWriterTest extends AnyFunSuite with 
Matchers with BeforeAndAfter
     )
     val writeSchema = StructType(Seq(
       StructField("name", StringType, nullable = false),
-      StructField("age", IntegerType, nullable = false)
+      StructField("age", IntegerType, nullable = false),
+      StructField("ts", LongType, nullable = false),
+      StructField("bin", BinaryType, nullable = false),
     ))
-    val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema, 
writeOptions.tableName)
+    val pinotSchema = SparkToPinotTypeTranslator.translate(
+      writeSchema, writeOptions.tableName, writeOptions.timeColumnName,
+      writeOptions.timeFormat, writeOptions.timeGranularity)
     val writer = new PinotDataWriter[InternalRow](0, 0, writeOptions, 
writeSchema, pinotSchema)
-    val record1 = new TestInternalRow(Array[Any]("Alice", 30))
+    val record1 = new TestInternalRow(Array[Any]("Alice", 30, 1234567890L, 
"Alice".getBytes))
     writer.write(record1)
+    val record2 = new TestInternalRow(Array[Any]("Bob", 25, 1234567891L, 
"Bob".getBytes))
+    writer.write(record2)
 
     val commitMessage: WriterCommitMessage = writer.commit()
     commitMessage shouldBe a[SuccessWriterCommitMessage]
 
     // Verify that the segment is created and stored in the target location
     val fs = FileSystem.get(new URI(writeOptions.savePath), new 
org.apache.hadoop.conf.Configuration())
-    val segmentPath = new Path(writeOptions.savePath + "/testTable_000.tar.gz")
+    val segmentPath = new Path(writeOptions.savePath + 
"/testTable_1234567890_1234567891_000.tar.gz")
     fs.exists(segmentPath) shouldBe true
 
     // Verify the contents of the segment tar file
     TarCompressionUtils.untar(
-      new File(writeOptions.savePath + "/testTable_000.tar.gz"),
+      new File(writeOptions.savePath + 
"/testTable_1234567890_1234567891_000.tar.gz"),
       new File(writeOptions.savePath))
-    val untarDir = Paths.get(writeOptions.savePath + "/testTable_000/v3/")
+    val untarDir = Paths.get(writeOptions.savePath + 
"/testTable_1234567890_1234567891_000/v3/")
     Files.exists(untarDir) shouldBe true
 
     val segmentFiles = Files.list(untarDir).toArray.map(_.toString)
@@ -132,14 +146,19 @@ class PinotDataWriterTest extends AnyFunSuite with 
Matchers with BeforeAndAfter
     val metadataSrc = Source.fromFile(untarDir + "/metadata.properties")
     val metadataContent = metadataSrc.getLines.mkString("\n")
     metadataSrc.close()
-    metadataContent should include ("segment.name = testTable_000")
+
+    metadataContent should include ("segment.name = 
testTable_1234567890_1234567891_000")
+    metadataContent should include ("segment.time.column.name = ts")
+    metadataContent should include ("segment.start.time = 1234567890")
+    metadataContent should include ("segment.end.time = 1234567891")
   }
 
   test("getSegmentName should format segment name correctly with custom 
format") {
     val testCases = Seq(
       ("{table}_{partitionId}", "airlineStats_12"),
       ("{partitionId:05}_{table}", "00012_airlineStats"),
-      ("{table}_20240805", "airlineStats_20240805")
+      ("{table}_20240805", "airlineStats_20240805"),
+      ("{table}_{startTime}_{endTime}_{partitionId:03}", 
"airlineStats_1234567890_1234567891_012"),
     )
 
     testCases.foreach { case (format, expected) =>
@@ -147,19 +166,26 @@ class PinotDataWriterTest extends AnyFunSuite with 
Matchers with BeforeAndAfter
         tableName = "airlineStats",
         savePath = "/tmp/pinot",
         timeColumnName = "ts",
+        timeFormat = "EPOCH|SECONDS",
+        timeGranularity = "1:SECONDS",
         segmentNameFormat = format,
         invertedIndexColumns = Array("name"),
         noDictionaryColumns = Array("age"),
         bloomFilterColumns = Array("name"),
-        rangeIndexColumns = Array()
+        rangeIndexColumns = Array(),
       )
       val writeSchema = StructType(Seq(
         StructField("name", StringType, nullable = false),
-        StructField("age", IntegerType, nullable = false)
+        StructField("age", IntegerType, nullable = false),
+        StructField("ts", LongType, nullable = false),
       ))
 
-      val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema, 
writeOptions.tableName)
+      val pinotSchema = SparkToPinotTypeTranslator.translate(
+        writeSchema, writeOptions.tableName, writeOptions.timeColumnName,
+        writeOptions.timeFormat, writeOptions.timeGranularity)
       val writer = new PinotDataWriter[InternalRow](12, 0, writeOptions, 
writeSchema, pinotSchema)
+      writer.write(new TestInternalRow(Array[Any]("Alice", 30, 1234567890L)))
+      writer.write(new TestInternalRow(Array[Any]("Bob", 25, 1234567891L)))
 
       val segmentName = writer.getSegmentName
 
diff --git 
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWriteTest.scala
 
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWriteTest.scala
index c9197a67b9..0a30f06529 100644
--- 
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWriteTest.scala
+++ 
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWriteTest.scala
@@ -20,7 +20,7 @@ package org.apache.pinot.connector.spark.v3.datasource
 
 import org.apache.pinot.spi.data.Schema
 import org.apache.spark.sql.connector.write.LogicalWriteInfo
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.scalatest.funsuite.AnyFunSuite
 import org.scalatest.matchers.should.Matchers
@@ -35,6 +35,8 @@ class PinotWriteTest extends AnyFunSuite with Matchers {
       "segmentNameFormat" -> "my_segment_format",
       "path" -> "/path/to/save",
       "timeColumnName" -> "timeCol",
+      "timeFormat" -> "EPOCH|SECONDS",
+      "timeGranularity" -> "1:SECONDS",
       "invertedIndexColumns" -> "col1,col2",
       "noDictionaryColumns" -> "col3,col4",
       "bloomFilterColumns" -> "col5,col6",
@@ -43,7 +45,8 @@ class PinotWriteTest extends AnyFunSuite with Matchers {
 
     val schema = StructType(Seq(
       StructField("col1", StringType),
-      StructField("col2", StringType)
+      StructField("col2", StringType),
+      StructField("timeCol", LongType),
     ))
 
     val logicalWriteInfo = new TestLogicalWriteInfo(new 
CaseInsensitiveStringMap(options.asJava), schema)
@@ -54,6 +57,8 @@ class PinotWriteTest extends AnyFunSuite with Matchers {
     pinotWrite.writeOptions.segmentNameFormat shouldEqual "my_segment_format"
     pinotWrite.writeOptions.savePath shouldEqual "/path/to/save"
     pinotWrite.writeOptions.timeColumnName shouldEqual "timeCol"
+    pinotWrite.writeOptions.timeFormat shouldEqual "EPOCH|SECONDS"
+    pinotWrite.writeOptions.timeGranularity shouldEqual "1:SECONDS"
     pinotWrite.writeOptions.invertedIndexColumns shouldEqual Array("col1", 
"col2")
     pinotWrite.writeOptions.noDictionaryColumns shouldEqual Array("col3", 
"col4")
     pinotWrite.writeOptions.bloomFilterColumns shouldEqual Array("col5", 
"col6")
@@ -68,7 +73,15 @@ class PinotWriteTest extends AnyFunSuite with Matchers {
         |  "dimensionFieldSpecs": [
         |    {"name": "col1", "dataType": "STRING"},
         |    {"name": "col2", "dataType": "STRING"}
-        |  ]
+        |  ],
+        |  "dateTimeFieldSpecs" : [ {
+        |    "name" : "timeCol",
+        |    "dataType" : "LONG",
+        |    "fieldType" : "DATE_TIME",
+        |    "notNull" : false,
+        |    "format" : "EPOCH|SECONDS",
+        |    "granularity" : "1:SECONDS"
+        |  } ]
         |}
         |""".stripMargin)
     pinotWrite.pinotSchema shouldEqual expectedPinotSchema
diff --git 
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslatorTest.scala
 
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslatorTest.scala
index 6001339c27..9d0f23374a 100644
--- 
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslatorTest.scala
+++ 
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslatorTest.scala
@@ -19,6 +19,7 @@
 package org.apache.pinot.connector.spark.v3.datasource
 
 import org.apache.pinot.spi.data.FieldSpec
+import org.apache.pinot.spi.data.FieldSpec.FieldType
 import org.apache.spark.sql.types._
 import org.scalatest.funsuite.AnyFunSuite
 
@@ -40,11 +41,25 @@ class SparkToPinotTypeTranslatorTest extends AnyFunSuite {
     for ((sparkType, expectedPinotType) <- typeMappings) {
       val fieldName = s"${sparkType.simpleString}Field"
       val sparkSchema = StructType(Array(StructField(fieldName, sparkType)))
-      val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema, 
"table")
+      val pinotSchema = SparkToPinotTypeTranslator.translate(
+        sparkSchema, "table", null, null, null)
       assert(pinotSchema.getFieldSpecFor(fieldName).getDataType == 
expectedPinotType)
     }
   }
 
+  test("Translate time column") {
+    val sparkSchema = StructType(Array(StructField("timeField", LongType)))
+    val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema, 
"table", "timeField",
+      "EPOCH|SECONDS", "1:SECONDS")
+
+    val dateTimeField = pinotSchema.getDateTimeSpec("timeField")
+
+    assert(dateTimeField != null)
+    assert(dateTimeField.getFieldType == FieldType.DATE_TIME)
+    assert(dateTimeField.getFormat == "EPOCH|SECONDS")
+    assert(dateTimeField.getGranularity == "1:SECONDS")
+  }
+
   test("Translate multi value data types") {
     val arrayTypeMappings = List(
       (ArrayType(StringType), FieldSpec.DataType.STRING),
@@ -61,7 +76,8 @@ class SparkToPinotTypeTranslatorTest extends AnyFunSuite {
     for ((sparkArrayType, expectedPinotType) <- arrayTypeMappings) {
       val fieldName = s"${sparkArrayType.simpleString}Field"
       val sparkSchema = StructType(Array(StructField(fieldName, 
sparkArrayType)))
-      val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema, 
"table")
+      val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema, 
"table",
+        null, null, null)
       assert(pinotSchema.getFieldSpecFor(fieldName).getDataType == 
expectedPinotType)
       assert(!pinotSchema.getFieldSpecFor(fieldName).isSingleValueField)
     }
diff --git 
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala
 
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala
index b9bdee7daf..79627447f1 100644
--- 
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala
+++ 
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala
@@ -29,6 +29,8 @@ object PinotDataSourceWriteOptions {
   val CONFIG_BLOOM_FILTER_COLUMNS = "bloomFilterColumns"
   val CONFIG_RANGE_INDEX_COLUMNS = "rangeIndexColumns"
   val CONFIG_TIME_COLUMN_NAME = "timeColumnName"
+  val CONFIG_TIME_FORMAT = "timeFormat"
+  val CONFIG_TIME_GRANULARITY = "timeGranularity"
 
   private[pinot] def from(options: util.Map[String, String]): 
PinotDataSourceWriteOptions = {
     if (!options.containsKey(CONFIG_TABLE_NAME)) {
@@ -46,6 +48,8 @@ object PinotDataSourceWriteOptions {
     val bloomFilterColumns = options.getOrDefault(CONFIG_BLOOM_FILTER_COLUMNS, 
"").split(",").filter(_.nonEmpty)
     val rangeIndexColumns = options.getOrDefault(CONFIG_RANGE_INDEX_COLUMNS, 
"").split(",").filter(_.nonEmpty)
     val timeColumnName = options.getOrDefault(CONFIG_TIME_COLUMN_NAME, null)
+    val timeFormat = options.getOrDefault(CONFIG_TIME_FORMAT, null)
+    val timeGranularity = options.getOrDefault(CONFIG_TIME_GRANULARITY, null)
 
     if (tableName == null) {
       throw new IllegalArgumentException("Table name is required")
@@ -56,12 +60,22 @@ object PinotDataSourceWriteOptions {
     if (segmentNameFormat == "") {
       throw new IllegalArgumentException("Segment name format cannot be empty 
string")
     }
+    if (timeColumnName != null) {
+      if (timeFormat == null) {
+        throw new IllegalArgumentException("Time format is required when time 
column name is specified")
+      }
+      if (timeGranularity == null) {
+        throw new IllegalArgumentException("Time granularity is required when 
time column name is specified")
+      }
+    }
 
     PinotDataSourceWriteOptions(
       tableName,
       segmentNameFormat,
       savePath,
       timeColumnName,
+      timeFormat,
+      timeGranularity,
       invertedIndexColumns,
       noDictionaryColumns,
       bloomFilterColumns,
@@ -76,6 +90,8 @@ private[pinot] case class PinotDataSourceWriteOptions(
                                                        segmentNameFormat: 
String,
                                                        savePath: String,
                                                        timeColumnName: String,
+                                                       timeFormat: String,
+                                                       timeGranularity: String,
                                                        invertedIndexColumns: 
Array[String],
                                                        noDictionaryColumns: 
Array[String],
                                                        bloomFilterColumns: 
Array[String],
diff --git 
a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala
 
b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala
index aca91d06ea..96d955ec87 100644
--- 
a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala
+++ 
b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala
@@ -31,7 +31,9 @@ class PinotDataSourceWriteOptionsTest extends BaseTest {
       PinotDataSourceWriteOptions.CONFIG_NO_DICTIONARY_COLUMNS -> "col3,col4",
       PinotDataSourceWriteOptions.CONFIG_BLOOM_FILTER_COLUMNS -> "col5,col6",
       PinotDataSourceWriteOptions.CONFIG_RANGE_INDEX_COLUMNS -> "col7,col8",
-      PinotDataSourceWriteOptions.CONFIG_TIME_COLUMN_NAME -> "timeCol"
+      PinotDataSourceWriteOptions.CONFIG_TIME_COLUMN_NAME -> "timeCol",
+      PinotDataSourceWriteOptions.CONFIG_TIME_FORMAT -> "EPOCH|SECONDS",
+      PinotDataSourceWriteOptions.CONFIG_TIME_GRANULARITY -> "1:SECONDS",
     )
 
     val pinotDataSourceWriteOptions = 
PinotDataSourceWriteOptions.from(options.asJava)
@@ -41,6 +43,8 @@ class PinotDataSourceWriteOptionsTest extends BaseTest {
       "segment_name",
       "/path/to/save",
       "timeCol",
+      "EPOCH|SECONDS",
+      "1:SECONDS",
       Array("col1", "col2"),
       Array("col3", "col4"),
       Array("col5", "col6"),
@@ -109,6 +113,8 @@ class PinotDataSourceWriteOptionsTest extends BaseTest {
       "tbl-{partitionId:03}",
       "/path/to/save",
       null,
+      null,
+      null,
       Array.empty,
       Array.empty,
       Array.empty,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to