This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 32314bb433 Fix Spark connector empty datatable handling in GRPC reader 
(#9837)
32314bb433 is described below

commit 32314bb4335a455c474cab3a0710b363bd114cbd
Author: Caner Balci <canerba...@gmail.com>
AuthorDate: Tue Nov 22 11:46:53 2022 -0800

    Fix Spark connector empty datatable handling in GRPC reader (#9837)
---
 .../spark/connector/PinotGrpcServerDataFetcher.scala     | 16 +++++++---------
 .../connector/spark/ExampleSparkPinotConnectorTest.scala | 16 ++++++++++++++++
 2 files changed, 23 insertions(+), 9 deletions(-)

diff --git 
a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
 
b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
index 78cb960998..f543b5b7c9 100644
--- 
a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
+++ 
b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
@@ -22,7 +22,6 @@ import io.grpc.ManagedChannelBuilder
 import org.apache.pinot.common.datatable.{DataTable, DataTableFactory}
 import org.apache.pinot.common.proto.PinotQueryServerGrpc
 import org.apache.pinot.common.proto.Server.ServerRequest
-import org.apache.pinot.connector.spark.exceptions.PinotException
 import org.apache.pinot.connector.spark.utils.Logging
 import org.apache.pinot.spi.config.table.TableType
 
@@ -60,17 +59,16 @@ private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: 
PinotSplit)
     logInfo(s"Pinot server total response time in millis: ${System.nanoTime() 
- requestStartTime}")
 
     try {
-      val dataTables = (for {
+      val dataTables = for {
         serverResponse <- serverResponse.asScala.toList
         if serverResponse.getMetadataMap.get("responseType") == "data"
-      } yield 
DataTableFactory.getDataTable(serverResponse.getPayload.toByteArray))
-        .filter(_.getNumberOfRows > 0)
+      } yield 
DataTableFactory.getDataTable(serverResponse.getPayload.toByteArray)
 
-      if (dataTables.isEmpty) {
-        throw PinotException(s"Empty response from 
${pinotSplit.serverAndSegments.serverHost}:${pinotSplit.serverAndSegments.serverGrpcPort}")
-      }
-
-      dataTables
+      dataTables.filter(_.getNumberOfRows > 0)
+    } catch {
+      case e: io.grpc.StatusRuntimeException =>
+        logError(s"Caught exception when reading data from 
${pinotSplit.serverAndSegments.serverHost}:${pinotSplit.serverAndSegments.serverGrpcPort}:
 ${e}")
+        throw e
     } finally {
       channel.shutdown()
       logInfo("Pinot server connection closed")
diff --git 
a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala
 
b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala
index be0adcbf08..f05266990e 100644
--- 
a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala
+++ 
b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala
@@ -42,6 +42,7 @@ object ExampleSparkPinotConnectorTest extends Logging {
     readHybridWithFilters()
     readHybridViaGrpc()
     readRealtimeViaGrpc()
+    readRealtimeWithFilterViaGrpc()
     readHybridWithFiltersViaGrpc()
     readRealtimeWithSelectionColumns()
     applyJustSomeFilters()
@@ -163,6 +164,21 @@ object ExampleSparkPinotConnectorTest extends Logging {
     data.show()
   }
 
+  def readRealtimeWithFilterViaGrpc()(implicit spark: SparkSession): Unit = {
+    import spark.implicits._
+    log.info("## Reading `airlineStats_REALTIME` table... ##")
+    val data = spark.read
+      .format("pinot")
+      .option("table", "airlineStats")
+      .option("tableType", "realtime")
+      .option("useGrpcServer", "true")
+      .load()
+      .filter($"DestWac" === 5)
+      .select($"FlightNum", $"Origin", $"DestStateName")
+
+    data.show()
+  }
+
   def readHybridWithFiltersViaGrpc()(implicit spark: SparkSession): Unit = {
     import spark.implicits._
     log.info("## Reading `airlineStats_OFFLINE` table with filter push down... 
##")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to