This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d0d4419488 [SparkConnector] Add proper time column support for Spark
connector segment writer (#14556)
d0d4419488 is described below
commit d0d4419488d519ab237da4467e9e290ffb0717aa
Author: Caner Balci <[email protected]>
AuthorDate: Mon Dec 2 16:42:53 2024 -0800
[SparkConnector] Add proper time column support for Spark connector segment
writer (#14556)
---
.../spark/v3/datasource/PinotDataWriter.scala | 38 +++++++++++++-
.../connector/spark/v3/datasource/PinotWrite.scala | 3 +-
.../v3/datasource/SparkToPinotTypeTranslator.scala | 22 +++++---
.../ExampleSparkPinotConnectorWriteTest.scala | 2 +
.../spark/v3/datasource/PinotDataWriterTest.scala | 60 ++++++++++++++++------
.../spark/v3/datasource/PinotWriteTest.scala | 19 +++++--
.../SparkToPinotTypeTranslatorTest.scala | 20 +++++++-
.../spark/common/PinotDataSourceWriteOptions.scala | 16 ++++++
.../common/PinotDataSourceWriteOptionsTest.scala | 8 ++-
9 files changed, 154 insertions(+), 34 deletions(-)
diff --git
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
index 5a6a2a5ae3..41f86e4d2d 100644
---
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
+++
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
@@ -62,8 +62,29 @@ class PinotDataWriter[InternalRow](
private[pinot] val savePath: String = writeOptions.savePath
private[pinot] val bufferedRecordReader: PinotBufferedRecordReader = new
PinotBufferedRecordReader()
+ private val timeColumnName = writeOptions.timeColumnName
+ private val timeColumnIndex = if (timeColumnName != null)
writeSchema.fieldIndex(timeColumnName) else -1
+
+ private var isTimeColumnNumeric = false
+ if (timeColumnIndex > -1) {
+ isTimeColumnNumeric = writeSchema.fields(timeColumnIndex).dataType match {
+ case org.apache.spark.sql.types.IntegerType => true
+ case org.apache.spark.sql.types.LongType => true
+ case _ => false
+ }
+ }
+ private var startTime = Long.MaxValue
+ private var endTime = 0L
+
override def write(record: catalyst.InternalRow): Unit = {
bufferedRecordReader.write(internalRowToGenericRow(record))
+
+ // Tracking startTime and endTime for segment name generation purposes
+ if (timeColumnIndex > -1 && isTimeColumnNumeric) {
+ val time = record.getLong(timeColumnIndex)
+ startTime = Math.min(startTime, time)
+ endTime = Math.max(endTime, time)
+ }
}
override def commit(): WriterCommitMessage = {
@@ -77,7 +98,10 @@ class PinotDataWriter[InternalRow](
/** This method is used to generate the segment name based on the format
* provided in the write options (segmentNameFormat).
* The format can contain variables like {partitionId}.
- * Currently supported variables are `partitionId`, `table`
+ * Currently supported variables are `partitionId`, `table`, `startTime`
and `endTime`
+ *
+ * `startTime` and `endTime` are the minimum and maximum values of the time
column in the records
+ * and it is only available if the time column is numeric.
*
* It also supports the following, python inspired format specifier for
digit formatting:
* `{partitionId:05}`
@@ -88,12 +112,15 @@ class PinotDataWriter[InternalRow](
* "{partitionId:05}_{table}" -> "00012_airlineStats"
* "{table}_{partitionId}" -> "airlineStats_12"
* "{table}_20240805" -> "airlineStats_20240805"
+ * "{table}_{startTime}_{endTime}_{partitionId:03}" ->
"airlineStats_1234567890_1234567891_012"
*/
private[pinot] def getSegmentName: String = {
val format = writeOptions.segmentNameFormat
val variables = Map(
"partitionId" -> partitionId,
"table" -> tableName,
+ "startTime" -> startTime,
+ "endTime" -> endTime,
)
val pattern = Pattern.compile("\\{(\\w+)(?::(\\d+))?}")
@@ -145,11 +172,14 @@ class PinotDataWriter[InternalRow](
indexingConfig: IndexingConfig,
outputDir: File,
): SegmentGeneratorConfig = {
+ val segmentsValidationAndRetentionConfig = new
SegmentsValidationAndRetentionConfig()
+ segmentsValidationAndRetentionConfig.setTimeColumnName(timeColumnName)
+
// Mostly dummy tableConfig, sufficient for segment generation purposes
val tableConfig = new TableConfig(
tableName,
"OFFLINE",
- new SegmentsValidationAndRetentionConfig(),
+ segmentsValidationAndRetentionConfig,
new TenantConfig(null, null, null),
indexingConfig,
new TableCustomConfig(null),
@@ -192,6 +222,8 @@ class PinotDataWriter[InternalRow](
gr.putValue(field.name, record.getBoolean(idx))
case org.apache.spark.sql.types.ByteType =>
gr.putValue(field.name, record.getByte(idx))
+ case org.apache.spark.sql.types.BinaryType =>
+ gr.putValue(field.name, record.getBinary(idx))
case org.apache.spark.sql.types.ShortType =>
gr.putValue(field.name, record.getShort(idx))
case org.apache.spark.sql.types.ArrayType(elementType, _) =>
@@ -210,6 +242,8 @@ class PinotDataWriter[InternalRow](
gr.putValue(field.name,
record.getArray(idx).array.map(_.asInstanceOf[Boolean]))
case org.apache.spark.sql.types.ByteType =>
gr.putValue(field.name,
record.getArray(idx).array.map(_.asInstanceOf[Byte]))
+ case org.apache.spark.sql.types.BinaryType =>
+ gr.putValue(field.name,
record.getArray(idx).array.map(_.asInstanceOf[Array[Byte]]))
case org.apache.spark.sql.types.ShortType =>
gr.putValue(field.name,
record.getArray(idx).array.map(_.asInstanceOf[Short]))
case _ =>
diff --git
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWrite.scala
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWrite.scala
index 0c0e441686..678da35f5e 100644
---
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWrite.scala
+++
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWrite.scala
@@ -28,7 +28,8 @@ class PinotWrite(
) extends Write with BatchWrite {
private[pinot] val writeOptions: PinotDataSourceWriteOptions =
PinotDataSourceWriteOptions.from(logicalWriteInfo.options())
private[pinot] val writeSchema: StructType = logicalWriteInfo.schema()
- private[pinot] val pinotSchema: Schema =
SparkToPinotTypeTranslator.translate(writeSchema, writeOptions.tableName)
+ private[pinot] val pinotSchema: Schema =
SparkToPinotTypeTranslator.translate(
+ writeSchema, writeOptions.tableName, writeOptions.timeColumnName,
writeOptions.timeFormat, writeOptions.timeGranularity)
override def createBatchWriterFactory(physicalWriteInfo: PhysicalWriteInfo):
DataWriterFactory = {
// capture the values to allow lambda serialization
diff --git
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala
index 5584a02334..a26f923f94 100644
---
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala
+++
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala
@@ -18,26 +18,32 @@
*/
package org.apache.pinot.connector.spark.v3.datasource
-import org.apache.pinot.spi.data.FieldSpec
-import org.apache.pinot.spi.data.Schema
+import org.apache.pinot.spi.data.{FieldSpec, Schema}
import org.apache.pinot.spi.data.Schema.SchemaBuilder
import org.apache.spark.sql.types._
object SparkToPinotTypeTranslator {
- // TODO: incorporate time column
- def translate(sparkSchema: StructType, tableName: String): Schema = {
+ def translate(sparkSchema: StructType,
+ tableName: String,
+ timeColumn: String,
+ timeFormat: String,
+ timeGranularity: String): Schema = {
val schemaBuilder = new SchemaBuilder
schemaBuilder.setSchemaName(tableName)
for (field <- sparkSchema.fields) {
val fieldName = field.name
val sparkType = field.dataType
val pinotType = translateType(sparkType)
+
if (pinotType != null) {
- if (sparkType.isInstanceOf[ArrayType]) {
- schemaBuilder.addMultiValueDimension(fieldName, pinotType)
- } else {
- schemaBuilder.addSingleValueDimension(fieldName, pinotType)
+ (fieldName, sparkType) match {
+ case (`timeColumn`, _) =>
+ schemaBuilder.addDateTime(fieldName, pinotType, timeFormat,
timeGranularity);
+ case (_, _: ArrayType) =>
+ schemaBuilder.addMultiValueDimension(fieldName, pinotType)
+ case _ =>
+ schemaBuilder.addSingleValueDimension(fieldName, pinotType)
}
}
else throw new UnsupportedOperationException("Unsupported data type: " +
sparkType)
diff --git
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorWriteTest.scala
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorWriteTest.scala
index 45343a8999..64582c9df8 100644
---
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorWriteTest.scala
+++
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorWriteTest.scala
@@ -55,6 +55,8 @@ object ExampleSparkPinotConnectorWriteTest extends Logging {
.option("noDictionaryColumns", "airport,state")
.option("bloomFilterColumns", "airport")
.option("timeColumnName", "ts")
+ .option("timeFormat", "EPOCH|SECONDS")
+ .option("timeGranularity", "1:SECONDS")
.save("myPath")
}
}
diff --git
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriterTest.scala
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriterTest.scala
index 33149ef952..1653785456 100644
---
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriterTest.scala
+++
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriterTest.scala
@@ -20,7 +20,7 @@ package org.apache.pinot.connector.spark.v3.datasource
import org.apache.pinot.connector.spark.common.PinotDataSourceWriteOptions
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType,
StructField, StructType, BinaryType}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.scalatest.matchers.should.Matchers
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -55,6 +55,8 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers
with BeforeAndAfter
tableName = "testTable",
savePath = "/tmp/pinot",
timeColumnName = "ts",
+ timeFormat = "EPOCH|SECONDS",
+ timeGranularity = "1:SECONDS",
segmentNameFormat = "{table}_{partitionId:03}",
invertedIndexColumns = Array("name"),
noDictionaryColumns = Array("age"),
@@ -63,14 +65,18 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers
with BeforeAndAfter
)
val writeSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
- StructField("age", IntegerType, nullable = false)
+ StructField("age", IntegerType, nullable = false),
+ StructField("ts", LongType, nullable = false),
+ StructField("bin", BinaryType, nullable = false),
))
- val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema,
writeOptions.tableName)
+ val pinotSchema = SparkToPinotTypeTranslator.translate(
+ writeSchema, writeOptions.tableName, writeOptions.timeColumnName,
+ writeOptions.timeFormat, writeOptions.timeGranularity)
val writer = new PinotDataWriter[InternalRow](0, 0, writeOptions,
writeSchema, pinotSchema)
- val record1 = new TestInternalRow(Array[Any]("Alice", 30))
- val record2 = new TestInternalRow(Array[Any]("Bob", 25))
+ val record1 = new TestInternalRow(Array[Any]("Alice", 30, 1234567890L,
"Alice".getBytes))
+ val record2 = new TestInternalRow(Array[Any]("Bob", 25, 1234567891L,
"Bob".getBytes))
writer.write(record1)
writer.write(record2)
@@ -92,7 +98,9 @@ class PinotDataWriterTest extends AnyFunSuite with Matchers
with BeforeAndAfter
tableName = "testTable",
savePath = tmpDir.getAbsolutePath,
timeColumnName = "ts",
- segmentNameFormat = "{table}_{partitionId:03}",
+ timeFormat = "EPOCH|SECONDS",
+ timeGranularity = "1:SECONDS",
+ segmentNameFormat = "{table}_{startTime}_{endTime}_{partitionId:03}",
invertedIndexColumns = Array("name"),
noDictionaryColumns = Array("age"),
bloomFilterColumns = Array("name"),
@@ -100,26 +108,32 @@ class PinotDataWriterTest extends AnyFunSuite with
Matchers with BeforeAndAfter
)
val writeSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
- StructField("age", IntegerType, nullable = false)
+ StructField("age", IntegerType, nullable = false),
+ StructField("ts", LongType, nullable = false),
+ StructField("bin", BinaryType, nullable = false),
))
- val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema,
writeOptions.tableName)
+ val pinotSchema = SparkToPinotTypeTranslator.translate(
+ writeSchema, writeOptions.tableName, writeOptions.timeColumnName,
+ writeOptions.timeFormat, writeOptions.timeGranularity)
val writer = new PinotDataWriter[InternalRow](0, 0, writeOptions,
writeSchema, pinotSchema)
- val record1 = new TestInternalRow(Array[Any]("Alice", 30))
+ val record1 = new TestInternalRow(Array[Any]("Alice", 30, 1234567890L,
"Alice".getBytes))
writer.write(record1)
+ val record2 = new TestInternalRow(Array[Any]("Bob", 25, 1234567891L,
"Bob".getBytes))
+ writer.write(record2)
val commitMessage: WriterCommitMessage = writer.commit()
commitMessage shouldBe a[SuccessWriterCommitMessage]
// Verify that the segment is created and stored in the target location
val fs = FileSystem.get(new URI(writeOptions.savePath), new
org.apache.hadoop.conf.Configuration())
- val segmentPath = new Path(writeOptions.savePath + "/testTable_000.tar.gz")
+ val segmentPath = new Path(writeOptions.savePath +
"/testTable_1234567890_1234567891_000.tar.gz")
fs.exists(segmentPath) shouldBe true
// Verify the contents of the segment tar file
TarCompressionUtils.untar(
- new File(writeOptions.savePath + "/testTable_000.tar.gz"),
+ new File(writeOptions.savePath +
"/testTable_1234567890_1234567891_000.tar.gz"),
new File(writeOptions.savePath))
- val untarDir = Paths.get(writeOptions.savePath + "/testTable_000/v3/")
+ val untarDir = Paths.get(writeOptions.savePath +
"/testTable_1234567890_1234567891_000/v3/")
Files.exists(untarDir) shouldBe true
val segmentFiles = Files.list(untarDir).toArray.map(_.toString)
@@ -132,14 +146,19 @@ class PinotDataWriterTest extends AnyFunSuite with
Matchers with BeforeAndAfter
val metadataSrc = Source.fromFile(untarDir + "/metadata.properties")
val metadataContent = metadataSrc.getLines.mkString("\n")
metadataSrc.close()
- metadataContent should include ("segment.name = testTable_000")
+
+ metadataContent should include ("segment.name =
testTable_1234567890_1234567891_000")
+ metadataContent should include ("segment.time.column.name = ts")
+ metadataContent should include ("segment.start.time = 1234567890")
+ metadataContent should include ("segment.end.time = 1234567891")
}
test("getSegmentName should format segment name correctly with custom
format") {
val testCases = Seq(
("{table}_{partitionId}", "airlineStats_12"),
("{partitionId:05}_{table}", "00012_airlineStats"),
- ("{table}_20240805", "airlineStats_20240805")
+ ("{table}_20240805", "airlineStats_20240805"),
+ ("{table}_{startTime}_{endTime}_{partitionId:03}",
"airlineStats_1234567890_1234567891_012"),
)
testCases.foreach { case (format, expected) =>
@@ -147,19 +166,26 @@ class PinotDataWriterTest extends AnyFunSuite with
Matchers with BeforeAndAfter
tableName = "airlineStats",
savePath = "/tmp/pinot",
timeColumnName = "ts",
+ timeFormat = "EPOCH|SECONDS",
+ timeGranularity = "1:SECONDS",
segmentNameFormat = format,
invertedIndexColumns = Array("name"),
noDictionaryColumns = Array("age"),
bloomFilterColumns = Array("name"),
- rangeIndexColumns = Array()
+ rangeIndexColumns = Array(),
)
val writeSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
- StructField("age", IntegerType, nullable = false)
+ StructField("age", IntegerType, nullable = false),
+ StructField("ts", LongType, nullable = false),
))
- val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema,
writeOptions.tableName)
+ val pinotSchema = SparkToPinotTypeTranslator.translate(
+ writeSchema, writeOptions.tableName, writeOptions.timeColumnName,
+ writeOptions.timeFormat, writeOptions.timeGranularity)
val writer = new PinotDataWriter[InternalRow](12, 0, writeOptions,
writeSchema, pinotSchema)
+ writer.write(new TestInternalRow(Array[Any]("Alice", 30, 1234567890L)))
+ writer.write(new TestInternalRow(Array[Any]("Bob", 25, 1234567891L)))
val segmentName = writer.getSegmentName
diff --git
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWriteTest.scala
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWriteTest.scala
index c9197a67b9..0a30f06529 100644
---
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWriteTest.scala
+++
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotWriteTest.scala
@@ -20,7 +20,7 @@ package org.apache.pinot.connector.spark.v3.datasource
import org.apache.pinot.spi.data.Schema
import org.apache.spark.sql.connector.write.LogicalWriteInfo
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types.{LongType, StringType, StructField,
StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
@@ -35,6 +35,8 @@ class PinotWriteTest extends AnyFunSuite with Matchers {
"segmentNameFormat" -> "my_segment_format",
"path" -> "/path/to/save",
"timeColumnName" -> "timeCol",
+ "timeFormat" -> "EPOCH|SECONDS",
+ "timeGranularity" -> "1:SECONDS",
"invertedIndexColumns" -> "col1,col2",
"noDictionaryColumns" -> "col3,col4",
"bloomFilterColumns" -> "col5,col6",
@@ -43,7 +45,8 @@ class PinotWriteTest extends AnyFunSuite with Matchers {
val schema = StructType(Seq(
StructField("col1", StringType),
- StructField("col2", StringType)
+ StructField("col2", StringType),
+ StructField("timeCol", LongType),
))
val logicalWriteInfo = new TestLogicalWriteInfo(new
CaseInsensitiveStringMap(options.asJava), schema)
@@ -54,6 +57,8 @@ class PinotWriteTest extends AnyFunSuite with Matchers {
pinotWrite.writeOptions.segmentNameFormat shouldEqual "my_segment_format"
pinotWrite.writeOptions.savePath shouldEqual "/path/to/save"
pinotWrite.writeOptions.timeColumnName shouldEqual "timeCol"
+ pinotWrite.writeOptions.timeFormat shouldEqual "EPOCH|SECONDS"
+ pinotWrite.writeOptions.timeGranularity shouldEqual "1:SECONDS"
pinotWrite.writeOptions.invertedIndexColumns shouldEqual Array("col1",
"col2")
pinotWrite.writeOptions.noDictionaryColumns shouldEqual Array("col3",
"col4")
pinotWrite.writeOptions.bloomFilterColumns shouldEqual Array("col5",
"col6")
@@ -68,7 +73,15 @@ class PinotWriteTest extends AnyFunSuite with Matchers {
| "dimensionFieldSpecs": [
| {"name": "col1", "dataType": "STRING"},
| {"name": "col2", "dataType": "STRING"}
- | ]
+ | ],
+ | "dateTimeFieldSpecs" : [ {
+ | "name" : "timeCol",
+ | "dataType" : "LONG",
+ | "fieldType" : "DATE_TIME",
+ | "notNull" : false,
+ | "format" : "EPOCH|SECONDS",
+ | "granularity" : "1:SECONDS"
+ | } ]
|}
|""".stripMargin)
pinotWrite.pinotSchema shouldEqual expectedPinotSchema
diff --git
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslatorTest.scala
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslatorTest.scala
index 6001339c27..9d0f23374a 100644
---
a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslatorTest.scala
+++
b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslatorTest.scala
@@ -19,6 +19,7 @@
package org.apache.pinot.connector.spark.v3.datasource
import org.apache.pinot.spi.data.FieldSpec
+import org.apache.pinot.spi.data.FieldSpec.FieldType
import org.apache.spark.sql.types._
import org.scalatest.funsuite.AnyFunSuite
@@ -40,11 +41,25 @@ class SparkToPinotTypeTranslatorTest extends AnyFunSuite {
for ((sparkType, expectedPinotType) <- typeMappings) {
val fieldName = s"${sparkType.simpleString}Field"
val sparkSchema = StructType(Array(StructField(fieldName, sparkType)))
- val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema,
"table")
+ val pinotSchema = SparkToPinotTypeTranslator.translate(
+ sparkSchema, "table", null, null, null)
assert(pinotSchema.getFieldSpecFor(fieldName).getDataType ==
expectedPinotType)
}
}
+ test("Translate time column") {
+ val sparkSchema = StructType(Array(StructField("timeField", LongType)))
+ val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema,
"table", "timeField",
+ "EPOCH|SECONDS", "1:SECONDS")
+
+ val dateTimeField = pinotSchema.getDateTimeSpec("timeField")
+
+ assert(dateTimeField != null)
+ assert(dateTimeField.getFieldType == FieldType.DATE_TIME)
+ assert(dateTimeField.getFormat == "EPOCH|SECONDS")
+ assert(dateTimeField.getGranularity == "1:SECONDS")
+ }
+
test("Translate multi value data types") {
val arrayTypeMappings = List(
(ArrayType(StringType), FieldSpec.DataType.STRING),
@@ -61,7 +76,8 @@ class SparkToPinotTypeTranslatorTest extends AnyFunSuite {
for ((sparkArrayType, expectedPinotType) <- arrayTypeMappings) {
val fieldName = s"${sparkArrayType.simpleString}Field"
val sparkSchema = StructType(Array(StructField(fieldName,
sparkArrayType)))
- val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema,
"table")
+ val pinotSchema = SparkToPinotTypeTranslator.translate(sparkSchema,
"table",
+ null, null, null)
assert(pinotSchema.getFieldSpecFor(fieldName).getDataType ==
expectedPinotType)
assert(!pinotSchema.getFieldSpecFor(fieldName).isSingleValueField)
}
diff --git
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala
index b9bdee7daf..79627447f1 100644
---
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala
+++
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala
@@ -29,6 +29,8 @@ object PinotDataSourceWriteOptions {
val CONFIG_BLOOM_FILTER_COLUMNS = "bloomFilterColumns"
val CONFIG_RANGE_INDEX_COLUMNS = "rangeIndexColumns"
val CONFIG_TIME_COLUMN_NAME = "timeColumnName"
+ val CONFIG_TIME_FORMAT = "timeFormat"
+ val CONFIG_TIME_GRANULARITY = "timeGranularity"
private[pinot] def from(options: util.Map[String, String]):
PinotDataSourceWriteOptions = {
if (!options.containsKey(CONFIG_TABLE_NAME)) {
@@ -46,6 +48,8 @@ object PinotDataSourceWriteOptions {
val bloomFilterColumns = options.getOrDefault(CONFIG_BLOOM_FILTER_COLUMNS,
"").split(",").filter(_.nonEmpty)
val rangeIndexColumns = options.getOrDefault(CONFIG_RANGE_INDEX_COLUMNS,
"").split(",").filter(_.nonEmpty)
val timeColumnName = options.getOrDefault(CONFIG_TIME_COLUMN_NAME, null)
+ val timeFormat = options.getOrDefault(CONFIG_TIME_FORMAT, null)
+ val timeGranularity = options.getOrDefault(CONFIG_TIME_GRANULARITY, null)
if (tableName == null) {
throw new IllegalArgumentException("Table name is required")
@@ -56,12 +60,22 @@ object PinotDataSourceWriteOptions {
if (segmentNameFormat == "") {
throw new IllegalArgumentException("Segment name format cannot be empty
string")
}
+ if (timeColumnName != null) {
+ if (timeFormat == null) {
+ throw new IllegalArgumentException("Time format is required when time
column name is specified")
+ }
+ if (timeGranularity == null) {
+ throw new IllegalArgumentException("Time granularity is required when
time column name is specified")
+ }
+ }
PinotDataSourceWriteOptions(
tableName,
segmentNameFormat,
savePath,
timeColumnName,
+ timeFormat,
+ timeGranularity,
invertedIndexColumns,
noDictionaryColumns,
bloomFilterColumns,
@@ -76,6 +90,8 @@ private[pinot] case class PinotDataSourceWriteOptions(
segmentNameFormat:
String,
savePath: String,
timeColumnName: String,
+ timeFormat: String,
+ timeGranularity: String,
invertedIndexColumns:
Array[String],
noDictionaryColumns:
Array[String],
bloomFilterColumns:
Array[String],
diff --git
a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala
b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala
index aca91d06ea..96d955ec87 100644
---
a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala
+++
b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptionsTest.scala
@@ -31,7 +31,9 @@ class PinotDataSourceWriteOptionsTest extends BaseTest {
PinotDataSourceWriteOptions.CONFIG_NO_DICTIONARY_COLUMNS -> "col3,col4",
PinotDataSourceWriteOptions.CONFIG_BLOOM_FILTER_COLUMNS -> "col5,col6",
PinotDataSourceWriteOptions.CONFIG_RANGE_INDEX_COLUMNS -> "col7,col8",
- PinotDataSourceWriteOptions.CONFIG_TIME_COLUMN_NAME -> "timeCol"
+ PinotDataSourceWriteOptions.CONFIG_TIME_COLUMN_NAME -> "timeCol",
+ PinotDataSourceWriteOptions.CONFIG_TIME_FORMAT -> "EPOCH|SECONDS",
+ PinotDataSourceWriteOptions.CONFIG_TIME_GRANULARITY -> "1:SECONDS",
)
val pinotDataSourceWriteOptions =
PinotDataSourceWriteOptions.from(options.asJava)
@@ -41,6 +43,8 @@ class PinotDataSourceWriteOptionsTest extends BaseTest {
"segment_name",
"/path/to/save",
"timeCol",
+ "EPOCH|SECONDS",
+ "1:SECONDS",
Array("col1", "col2"),
Array("col3", "col4"),
Array("col5", "col6"),
@@ -109,6 +113,8 @@ class PinotDataSourceWriteOptionsTest extends BaseTest {
"tbl-{partitionId:03}",
"/path/to/save",
null,
+ null,
+ null,
Array.empty,
Array.empty,
Array.empty,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]