This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 32314bb433 Fix Spark connector empty datatable handling in GRPC reader (#9837) 32314bb433 is described below commit 32314bb4335a455c474cab3a0710b363bd114cbd Author: Caner Balci <canerba...@gmail.com> AuthorDate: Tue Nov 22 11:46:53 2022 -0800 Fix Spark connector empty datatable handling in GRPC reader (#9837) --- .../spark/connector/PinotGrpcServerDataFetcher.scala | 16 +++++++--------- .../connector/spark/ExampleSparkPinotConnectorTest.scala | 16 ++++++++++++++++ 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala index 78cb960998..f543b5b7c9 100644 --- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala +++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala @@ -22,7 +22,6 @@ import io.grpc.ManagedChannelBuilder import org.apache.pinot.common.datatable.{DataTable, DataTableFactory} import org.apache.pinot.common.proto.PinotQueryServerGrpc import org.apache.pinot.common.proto.Server.ServerRequest -import org.apache.pinot.connector.spark.exceptions.PinotException import org.apache.pinot.connector.spark.utils.Logging import org.apache.pinot.spi.config.table.TableType @@ -60,17 +59,16 @@ private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit) logInfo(s"Pinot server total response time in millis: ${System.nanoTime() - requestStartTime}") try { - val dataTables = (for { + val dataTables = for { serverResponse <- serverResponse.asScala.toList if serverResponse.getMetadataMap.get("responseType") == "data" - } yield DataTableFactory.getDataTable(serverResponse.getPayload.toByteArray)) - .filter(_.getNumberOfRows > 0) + } yield DataTableFactory.getDataTable(serverResponse.getPayload.toByteArray) - if (dataTables.isEmpty) { - throw PinotException(s"Empty response from ${pinotSplit.serverAndSegments.serverHost}:${pinotSplit.serverAndSegments.serverGrpcPort}") - } - - dataTables + dataTables.filter(_.getNumberOfRows > 0) + } catch { + case e: io.grpc.StatusRuntimeException => + logError(s"Caught exception when reading data from ${pinotSplit.serverAndSegments.serverHost}:${pinotSplit.serverAndSegments.serverGrpcPort}: ${e}") + throw e } finally { channel.shutdown() logInfo("Pinot server connection closed") diff --git a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala index be0adcbf08..f05266990e 100644 --- a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala +++ b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala @@ -42,6 +42,7 @@ object ExampleSparkPinotConnectorTest extends Logging { readHybridWithFilters() readHybridViaGrpc() readRealtimeViaGrpc() + readRealtimeWithFilterViaGrpc() readHybridWithFiltersViaGrpc() readRealtimeWithSelectionColumns() applyJustSomeFilters() @@ -163,6 +164,21 @@ object ExampleSparkPinotConnectorTest extends Logging { data.show() } + def readRealtimeWithFilterViaGrpc()(implicit spark: SparkSession): Unit = { + import spark.implicits._ + log.info("## Reading `airlineStats_REALTIME` table... ##") + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "realtime") + .option("useGrpcServer", "true") + .load() + .filter($"DestWac" === 5) + .select($"FlightNum", $"Origin", $"DestStateName") + + data.show() + } + def readHybridWithFiltersViaGrpc()(implicit spark: SparkSession): Unit = { import spark.implicits._ log.info("## Reading `airlineStats_OFFLINE` table with filter push down... ##") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org