This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5a89491ce1 [spark-connector] Memory optimization for GRPC data fetcher (#10209) 5a89491ce1 is described below commit 5a89491ce17618a60a8ed952bf7da59e291aae11 Author: Caner Balci <canerba...@gmail.com> AuthorDate: Thu Feb 2 08:54:44 2023 -0800 [spark-connector] Memory optimization for GRPC data fetcher (#10209) * Return an Iterator from the dataFetcher rather than a List --- .../connector/PinotGrpcServerDataFetcher.scala | 17 +++++++------ .../datasource/PinotInputPartitionReader.scala | 29 ++++++++++++++-------- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala index f543b5b7c9..5c4bd3e48e 100644 --- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala +++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala @@ -25,6 +25,7 @@ import org.apache.pinot.common.proto.Server.ServerRequest import org.apache.pinot.connector.spark.utils.Logging import org.apache.pinot.spi.config.table.TableType +import java.io.Closeable import scala.collection.JavaConverters._ /** @@ -32,7 +33,7 @@ import scala.collection.JavaConverters._ * Eg: offline-server1: segment1, segment2, segment3 */ private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit) - extends Logging { + extends Logging with Closeable { private val channel = ManagedChannelBuilder .forAddress(pinotSplit.serverAndSegments.serverHost, pinotSplit.serverAndSegments.serverGrpcPort) @@ -41,8 +42,7 @@ private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit) .asInstanceOf[ManagedChannelBuilder[_]].build() private val pinotServerBlockingStub = PinotQueryServerGrpc.newBlockingStub(channel) - def fetchData(): List[DataTable] = { - val requestStartTime = System.nanoTime() + def fetchData(): Iterator[DataTable] = { val request = ServerRequest.newBuilder() .putMetadata("enableStreaming", "true") .addAllSegments(pinotSplit.serverAndSegments.segments.asJava) @@ -56,20 +56,23 @@ private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit) ) .build() val serverResponse = pinotServerBlockingStub.submit(request) - logInfo(s"Pinot server total response time in millis: ${System.nanoTime() - requestStartTime}") - try { val dataTables = for { - serverResponse <- serverResponse.asScala.toList + serverResponse <- serverResponse.asScala if serverResponse.getMetadataMap.get("responseType") == "data" } yield DataTableFactory.getDataTable(serverResponse.getPayload.toByteArray) dataTables.filter(_.getNumberOfRows > 0) + } catch { case e: io.grpc.StatusRuntimeException => logError(s"Caught exception when reading data from ${pinotSplit.serverAndSegments.serverHost}:${pinotSplit.serverAndSegments.serverGrpcPort}: ${e}") throw e - } finally { + } + } + + def close(): Unit = { + if (!channel.isShutdown) { channel.shutdown() logInfo("Pinot server connection closed") } diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala index de42b7a6cf..ba3966f472 100644 --- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala +++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala @@ -23,9 +23,11 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.v2.reader.InputPartitionReader import org.apache.spark.sql.types.StructType +import java.io.Closeable + /** * Actual data reader on spark worker side. - * Represents a spark partition, and is receive data from specified pinot server-segment list. + * Represents a spark partition, and receives data from specified pinot server-segment list. */ class PinotInputPartitionReader( schema: StructType, @@ -33,7 +35,8 @@ class PinotInputPartitionReader( pinotSplit: PinotSplit, dataSourceOptions: PinotDataSourceReadOptions) extends InputPartitionReader[InternalRow] { - private val responseIterator: Iterator[InternalRow] = fetchDataAndConvertToInternalRows() + + private val (responseIterator: Iterator[InternalRow], source: Closeable) = getIteratorAndSource() private[this] var currentRow: InternalRow = _ override def next(): Boolean = { @@ -48,18 +51,22 @@ class PinotInputPartitionReader( currentRow } - override def close(): Unit = {} + override def close(): Unit = { + source.close() + } - private def fetchDataAndConvertToInternalRows(): Iterator[InternalRow] = { - if (dataSourceOptions.useGrpcServer) - PinotGrpcServerDataFetcher(pinotSplit) - .fetchData() + private def getIteratorAndSource(): (Iterator[InternalRow], Closeable) = { + if (dataSourceOptions.useGrpcServer) { + val dataFetcher = PinotGrpcServerDataFetcher(pinotSplit) + val iterable = dataFetcher.fetchData() .flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema)) - .toIterator - else - PinotServerDataFetcher(partitionId, pinotSplit, dataSourceOptions) + (iterable, dataFetcher) + } else { + (PinotServerDataFetcher(partitionId, pinotSplit, dataSourceOptions) .fetchData() .flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema)) - .toIterator + .toIterator, + () => {}) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org