cbalci commented on code in PR #13748: URL: https://github.com/apache/pinot/pull/13748#discussion_r1707888530
########## pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala: ########## @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.commons.io.FileUtils +import org.apache.pinot.common.utils.TarGzCompressionUtils +import org.apache.pinot.connector.spark.common.PinotDataSourceWriteOptions +import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage} +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig +import org.apache.pinot.spi.config.table.{IndexingConfig, SegmentsValidationAndRetentionConfig, TableConfig, TableCustomConfig, TenantConfig} +import org.apache.pinot.spi.data.readers.GenericRow +import org.apache.pinot.spi.data.Schema +import org.apache.pinot.spi.ingestion.batch.spec.Constants +import org.apache.pinot.spi.utils.DataSizeUtils +import org.apache.spark.sql.catalyst +import org.apache.spark.sql.types.StructType +import org.slf4j.{Logger, LoggerFactory} + +import java.io.File +import java.nio.file.Files +import java.util.regex.Pattern + +class PinotDataWriter[InternalRow]( + partitionId: Int, + taskId: Long, + writeOptions: PinotDataSourceWriteOptions, + writeSchema: StructType, + pinotSchema: Schema) + extends DataWriter[org.apache.spark.sql.catalyst.InternalRow] with AutoCloseable { + private val logger: Logger = LoggerFactory.getLogger(classOf[PinotDataWriter[InternalRow]]) + logger.info("PinotDataWriter created with writeOptions: {}, partitionId: {}, taskId: {}", + (writeOptions, partitionId, taskId)) + + val tableName: String = writeOptions.tableName Review Comment: I only left them non private for testing convenience but I remember I could use scala's scope modifiers as: ``` private[pinot] val tableName: String = writeOptions.tableName ``` which I'll use. Thanks ########## pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala: ########## @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.commons.io.FileUtils +import org.apache.pinot.common.utils.TarGzCompressionUtils +import org.apache.pinot.connector.spark.common.PinotDataSourceWriteOptions +import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage} +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig +import org.apache.pinot.spi.config.table.{IndexingConfig, SegmentsValidationAndRetentionConfig, TableConfig, TableCustomConfig, TenantConfig} +import org.apache.pinot.spi.data.readers.GenericRow +import org.apache.pinot.spi.data.Schema +import org.apache.pinot.spi.ingestion.batch.spec.Constants +import org.apache.pinot.spi.utils.DataSizeUtils +import org.apache.spark.sql.catalyst +import org.apache.spark.sql.types.StructType +import org.slf4j.{Logger, LoggerFactory} + +import java.io.File +import java.nio.file.Files +import java.util.regex.Pattern + +class PinotDataWriter[InternalRow]( + partitionId: Int, + taskId: Long, + writeOptions: PinotDataSourceWriteOptions, + writeSchema: StructType, + pinotSchema: Schema) + extends DataWriter[org.apache.spark.sql.catalyst.InternalRow] with AutoCloseable { + private val logger: Logger = LoggerFactory.getLogger(classOf[PinotDataWriter[InternalRow]]) + logger.info("PinotDataWriter created with writeOptions: {}, partitionId: {}, taskId: {}", + (writeOptions, partitionId, taskId)) + + val tableName: String = writeOptions.tableName + val savePath: String = writeOptions.savePath + val bufferedRecordReader: PinotBufferedRecordReader = new PinotBufferedRecordReader() + + override def write(record: catalyst.InternalRow): Unit = { + bufferedRecordReader.write(internalRowToGenericRow(record)) + } + + override def commit(): WriterCommitMessage = { + val segmentName = getSegmentName + val segmentDir = generateSegment(segmentName) + val segmentTarFile = tarSegmentDir(segmentName, segmentDir) + pushSegmentTarFile(segmentTarFile) + new SuccessWriterCommitMessage(segmentName) + } + + // This method is used to generate the segment name based on the format provided in the write options + // The format can contain variables like {partitionId} + // Currently supported variables are `partitionId`, `table` + // It also supports the following, python inspired format specifier for digit formatting: + // `{partitionId:05}` + // which will zero pad partitionId up to five characters. + // + // Some examples: Review Comment: This utility supports a mini template engine with some variables. For now `partitionId` and `table` are supported but I'm thinking in the future we may want "data derived" variables such as `maxTimestamp` `minTimestamp`. The order is really up to user, they can hardcode certain parts of the segment name, or use the variable syntax "{variable}" to be replaced in this utility. ########## pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriterTest.scala: ########## @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.pinot.connector.spark.common.PinotDataSourceWriteOptions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.connector.write.WriterCommitMessage +import org.scalatest.matchers.should.Matchers +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.pinot.common.utils.TarGzCompressionUtils +import org.apache.pinot.spi.data.readers.GenericRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.scalatest.BeforeAndAfter +import org.scalatest.funsuite.AnyFunSuite + +import java.io.File +import java.net.URI +import java.nio.file.{Files, Paths} +import scala.io.Source + +class PinotDataWriterTest extends AnyFunSuite with Matchers with BeforeAndAfter { + + var tmpDir: File = _ + + before { + tmpDir = Files.createTempDirectory("pinot-spark-connector-write-test").toFile + } + + after { + if (tmpDir.exists()) { + tmpDir.listFiles().foreach(_.delete()) + tmpDir.delete() + } + } + + test("Initialize buffer and accept records") { + val writeOptions = PinotDataSourceWriteOptions( + tableName = "testTable", + savePath = "/tmp/pinot", + timeColumnName = "ts", + segmentNameFormat = "{table}_{partitionId:03}", + invertedIndexColumns = Array("name"), + noDictionaryColumns = Array("age"), + bloomFilterColumns = Array("name"), + rangeIndexColumns = Array() + ) + val writeSchema = StructType(Seq( + StructField("name", StringType, nullable = false), + StructField("age", IntegerType, nullable = false) + )) + + val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema, writeOptions.tableName) + val writer = new PinotDataWriter[InternalRow](0, 0, writeOptions, writeSchema, pinotSchema) + + val record1 = new TestInternalRow(Array[Any]("Alice", 30)) + val record2 = new TestInternalRow(Array[Any]("Bob", 25)) + + writer.write(record1) + writer.write(record2) + + val writeBuffer = writer.bufferedRecordReader + writer.bufferedRecordReader.hasNext shouldBe true + writeBuffer.next() shouldBe a[GenericRow] + writeBuffer.next() shouldBe a[GenericRow] + + writer.close() + writeBuffer.hasNext shouldBe false + } + + test("Should create segment file on commit") { + // create tmp directory with test name + tmpDir = Files.createTempDirectory("pinot-spark-connector-test").toFile + + val writeOptions = PinotDataSourceWriteOptions( + tableName = "testTable", + savePath = tmpDir.getAbsolutePath, + timeColumnName = "ts", + segmentNameFormat = "{table}_{partitionId:03}", + invertedIndexColumns = Array("name"), + noDictionaryColumns = Array("age"), + bloomFilterColumns = Array("name"), + rangeIndexColumns = Array() + ) + val writeSchema = StructType(Seq( + StructField("name", StringType, nullable = false), + StructField("age", IntegerType, nullable = false) + )) + val pinotSchema = SparkToPinotTypeTranslator.translate(writeSchema, writeOptions.tableName) + val writer = new PinotDataWriter[InternalRow](0, 0, writeOptions, writeSchema, pinotSchema) + val record1 = new TestInternalRow(Array[Any]("Alice", 30)) + writer.write(record1) + + val commitMessage: WriterCommitMessage = writer.commit() + commitMessage shouldBe a[SuccessWriterCommitMessage] + + // Verify that the segment is created and stored in the target location + val fs = FileSystem.get(new URI(writeOptions.savePath), new org.apache.hadoop.conf.Configuration()) + val segmentPath = new Path(writeOptions.savePath + "/testTable_000.tar.gz") + fs.exists(segmentPath) shouldBe true + + // Verify the contents of the segment tar file + TarGzCompressionUtils.untar( + new File(writeOptions.savePath + "/testTable_000.tar.gz"), + new File(writeOptions.savePath)) + val untarDir = Paths.get(writeOptions.savePath + "/testTable_000/v3/") + Files.exists(untarDir) shouldBe true + + val segmentFiles = Files.list(untarDir).toArray.map(_.toString) + segmentFiles should contain (untarDir + "/creation.meta") + segmentFiles should contain (untarDir + "/index_map") + segmentFiles should contain (untarDir + "/metadata.properties") + segmentFiles should contain (untarDir + "/columns.psf") + + // Verify basic metadata content + val metadataSrc = Source.fromFile(untarDir + "/metadata.properties") + val metadataContent = metadataSrc.getLines.mkString("\n") + metadataSrc.close() + metadataContent should include ("segment.name = testTable_000") + } + + test("getSegmentName should format segment name correctly with custom format") { + val testCases = Seq( Review Comment: It's a bit tricky to handle a broken template at this level. We already have a validation at `PinotDataSourceWriteOptions` to ensure non-empty string, but I'm not sure what is the best way to detect and handle "{table". We don't require the user to use all variables so it will just appear as "{table" in the segment name. ########## pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceWriteOptions.scala: ########## @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.common + +import java.util + +object PinotDataSourceWriteOptions { + val CONFIG_TABLE_NAME = "table" + val CONFIG_SEGMENT_NAME_FORMAT = "segmentNameFormat" + val CONFIG_PATH = "path" + val CONFIG_INVERTED_INDEX_COLUMNS = "invertedIndexColumns" + val CONFIG_NO_DICTIONARY_COLUMNS = "noDictionaryColumns" + val CONFIG_BLOOM_FILTER_COLUMNS = "bloomFilterColumns" + val CONFIG_RANGE_INDEX_COLUMNS = "rangeIndexColumns" + val CONFIG_TIME_COLUMN_NAME = "timeColumnName" + + private[pinot] def from(options: util.Map[String, String]): PinotDataSourceWriteOptions = { Review Comment: That's a good suggestion. I'll leave a TODO for now, but open to suggestion on API design. Do you say something like this would suffice? Tar (default) ``` airlineStats.write.format("pinot") \ .mode("append") \ .option("table", "airlineStats") \ .option("outputMode", "tar") \ .save("/my/segments/path") ``` Full directory ``` airlineStats.write.format("pinot") \ .mode("append") \ .option("table", "airlineStats") \ .option("outputMode", "directory") \ .save("/my/segments/path") ########## pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotBufferedRecordReader.scala: ########## @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.pinot.spi.data.readers.{GenericRow, RecordReader, RecordReaderConfig} + +import java.io.File +import java.util + +/** + * A buffered record reader that stores the records in memory and allows for iteration over them. + * This is useful to satisfy the RecordReader interface in Pinot, and also allow for Spark executor + * to write records. + * + * To improve resilience, we may write records to disk when memory is full, Review Comment: Sounds good, updated as a TODO. ########## pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala: ########## @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.commons.io.FileUtils +import org.apache.pinot.common.utils.TarGzCompressionUtils +import org.apache.pinot.connector.spark.common.PinotDataSourceWriteOptions +import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage} +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig +import org.apache.pinot.spi.config.table.{IndexingConfig, SegmentsValidationAndRetentionConfig, TableConfig, TableCustomConfig, TenantConfig} +import org.apache.pinot.spi.data.readers.GenericRow +import org.apache.pinot.spi.data.Schema +import org.apache.pinot.spi.ingestion.batch.spec.Constants +import org.apache.pinot.spi.utils.DataSizeUtils +import org.apache.spark.sql.catalyst +import org.apache.spark.sql.types.StructType +import org.slf4j.{Logger, LoggerFactory} + +import java.io.File +import java.nio.file.Files +import java.util.regex.Pattern + +class PinotDataWriter[InternalRow]( + partitionId: Int, + taskId: Long, + writeOptions: PinotDataSourceWriteOptions, + writeSchema: StructType, + pinotSchema: Schema) + extends DataWriter[org.apache.spark.sql.catalyst.InternalRow] with AutoCloseable { + private val logger: Logger = LoggerFactory.getLogger(classOf[PinotDataWriter[InternalRow]]) + logger.info("PinotDataWriter created with writeOptions: {}, partitionId: {}, taskId: {}", + (writeOptions, partitionId, taskId)) + + val tableName: String = writeOptions.tableName + val savePath: String = writeOptions.savePath + val bufferedRecordReader: PinotBufferedRecordReader = new PinotBufferedRecordReader() + + override def write(record: catalyst.InternalRow): Unit = { + bufferedRecordReader.write(internalRowToGenericRow(record)) Review Comment: Hmm, Do you suggest we publish this interface and the user (Spark application) passes in a segment name generator as a serialized class? That would be very flexible. But I was thinking a something simpler; we add two new template variables 'minTimestamp' and 'maxTimestamp' and in combination with 'table' and 'partitionId' they would cover most scenarios. I'll give both a thought in the context of a real-life use case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org