This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 232b946419 [pinot-spark-connector] Add option to connect using GRPC (#8481) 232b946419 is described below commit 232b946419d05b785610e9b2daf7467f5f8bee82 Author: Caner Balci <canerba...@gmail.com> AuthorDate: Wed Apr 27 21:46:37 2022 -0700 [pinot-spark-connector] Add option to connect using GRPC (#8481) --- pinot-connectors/pinot-spark-connector/pom.xml | 48 ++++++++++++++ .../spark/connector/PinotClusterClient.scala | 27 ++++++++ .../connector/PinotGrpcServerDataFetcher.scala | 77 ++++++++++++++++++++++ .../spark/connector/PinotServerDataFetcher.scala | 2 +- .../connector/spark/connector/PinotSplitter.scala | 37 +++++------ .../datasource/PinotDataSourceReadOptions.scala | 9 ++- .../spark/datasource/PinotDataSourceReader.scala | 11 +++- .../datasource/PinotInputPartitionReader.scala | 16 +++-- .../spark/ExampleSparkPinotConnectorTest.scala | 32 ++++++++- .../spark/connector/PinotSplitterTest.scala | 71 ++++++++++++++------ .../PinotDataSourceReadOptionsTest.scala | 8 ++- .../org/apache/pinot/tools/HybridQuickstart.java | 9 +++ 12 files changed, 294 insertions(+), 53 deletions(-) diff --git a/pinot-connectors/pinot-spark-connector/pom.xml b/pinot-connectors/pinot-spark-connector/pom.xml index bf969a0215..14a4e667e9 100644 --- a/pinot-connectors/pinot-spark-connector/pom.xml +++ b/pinot-connectors/pinot-spark-connector/pom.xml @@ -140,6 +140,19 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty-shaded</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> @@ -167,6 +180,41 @@ <build> <plugins> <!-- scala build --> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.6.1</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:3.19.2:exe:${os.detected.classifier}</protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.44.1:exe:${os.detected.classifier}</pluginArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>assemble-all</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala index 8ccbd437eb..48abd38c95 100644 --- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala +++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala @@ -40,6 +40,7 @@ private[pinot] object PinotClusterClient extends Logging { private val TABLE_BROKER_INSTANCES_TEMPLATE = "http://%s//brokers/tables/%s" private val TIME_BOUNDARY_TEMPLATE = "http://%s/debug/timeBoundary/%s" private val ROUTING_TABLE_TEMPLATE = "http://%s/debug/routingTable/sql?query=%s" + private val INSTANCES_API_TEMPLATE = "http://%s/instances/%s" def getTableSchema(controllerUrl: String, tableName: String): Schema = { val rawTableName = TableNameBuilder.extractRawTableName(tableName) @@ -176,6 +177,27 @@ private[pinot] object PinotClusterClient extends Logging { routingTables } + /** + * Get host information for a Pinot instance + * + * @return InstanceInfo + */ + def getInstanceInfo(controllerUrl: String, instance: String): InstanceInfo = { + Try { + val uri = new URI(String.format(INSTANCES_API_TEMPLATE, controllerUrl, instance)) + val response = HttpUtils.sendGetRequest(uri) + decodeTo[InstanceInfo](response) + } match { + case Success(decodedReponse) => + decodedReponse + case Failure(exception) => + throw PinotException( + s"An error occured while reading instance info for: '$instance'", + exception + ) + } + } + private def getRoutingTableForQuery(brokerUrl: String, sql: String): Map[String, List[String]] = { Try { val encodedPqlQueryParam = URLEncoder.encode(sql, "UTF-8") @@ -201,3 +223,8 @@ private[pinot] case class TimeBoundaryInfo(timeColumn: String, timeValue: String def getRealtimePredicate: String = s"$timeColumn >= $timeValue" } + +private[pinot] case class InstanceInfo(instanceName: String, + hostName: String, + port: String, + grpcPort: Int) diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala new file mode 100644 index 0000000000..ff63fa8ab2 --- /dev/null +++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.connector + +import io.grpc.ManagedChannelBuilder +import org.apache.pinot.common.proto.PinotQueryServerGrpc +import org.apache.pinot.common.proto.Server.ServerRequest +import org.apache.pinot.common.utils.DataTable +import org.apache.pinot.connector.spark.exceptions.PinotException +import org.apache.pinot.connector.spark.utils.Logging +import org.apache.pinot.core.common.datatable.DataTableFactory + +import scala.collection.JavaConverters._ + +/** + * Data fetcher from Pinot Grpc server with specific segments. + * Eg: offline-server1: segment1, segment2, segment3 + */ +private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit) + extends Logging { + + private val channel = ManagedChannelBuilder + .forAddress(pinotSplit.serverAndSegments.serverHost, pinotSplit.serverAndSegments.serverGrpcPort) + .usePlaintext() + .asInstanceOf[ManagedChannelBuilder[_]].build() + private val pinotServerBlockingStub = PinotQueryServerGrpc.newBlockingStub(channel) + + def fetchData(): List[DataTable] = { + val requestStartTime = System.nanoTime() + val request = ServerRequest.newBuilder() + .putMetadata("enableStreaming", "true") + .addAllSegments(pinotSplit.serverAndSegments.segments.asJava) + .setSql(pinotSplit.generatedSQLs.offlineSelectQuery) + .build() + val serverResponse = pinotServerBlockingStub.submit(request) + logInfo(s"Pinot server total response time in millis: ${System.nanoTime() - requestStartTime}") + + try { + val dataTables = (for { + serverResponse <- serverResponse.asScala.toList + if serverResponse.getMetadataMap.get("responseType") == "data" + } yield DataTableFactory.getDataTable(serverResponse.getPayload().toByteArray)) + .filter(_.getNumberOfRows > 0) + + if (dataTables.isEmpty) { + throw PinotException(s"Empty response from ${pinotSplit.serverAndSegments.serverHost}:${pinotSplit.serverAndSegments.serverGrpcPort}") + } + + dataTables + } finally { + channel.shutdown() + logInfo("Pinot server connection closed") + } + } +} + +object PinotGrpcServerDataFetcher { + def apply(pinotSplit: PinotSplit): PinotGrpcServerDataFetcher = { + new PinotGrpcServerDataFetcher(pinotSplit) + } +} diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala index 900ee35c50..651b87d718 100644 --- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala +++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala @@ -95,7 +95,7 @@ private[pinot] class PinotServerDataFetcher( val instanceConfig = new InstanceConfig(nullZkId) instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost) instanceConfig.setPort(pinotSplit.serverAndSegments.serverPort) - // TODO: support grpc and netty-sec + // TODO: support netty-sec val serverInstance = new ServerInstance(instanceConfig) Map( serverInstance -> pinotSplit.serverAndSegments.segments.asJava diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala index ed1c2cc750..1445b3c767 100644 --- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala +++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala @@ -18,10 +18,8 @@ */ package org.apache.pinot.connector.spark.connector -import java.util.regex.{Matcher, Pattern} - import org.apache.pinot.connector.spark.connector.query.GeneratedSQLs -import org.apache.pinot.connector.spark.exceptions.PinotException +import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions import org.apache.pinot.connector.spark.utils.Logging import org.apache.pinot.spi.config.table.TableType @@ -47,47 +45,43 @@ import org.apache.pinot.spi.config.table.TableType * - partition6: offlineServer10 -> segment20 */ private[pinot] object PinotSplitter extends Logging { - private val PINOT_SERVER_PATTERN = Pattern.compile("Server_(.*)_(\\d+)") def generatePinotSplits( generatedSQLs: GeneratedSQLs, routingTable: Map[TableType, Map[String, List[String]]], - segmentsPerSplit: Int): List[PinotSplit] = { + instanceInfoReader: String => InstanceInfo, + readParameters: PinotDataSourceReadOptions): List[PinotSplit] = { routingTable.flatMap { case (tableType, serversToSegments) => serversToSegments - .map { case (server, segments) => parseServerInput(server, segments) } + .map { case (server, segments) => (instanceInfoReader(server), segments) } .flatMap { - case (matcher, segments) => + case (instanceInfo, segments) => createPinotSplitsFromSubSplits( tableType, generatedSQLs, - matcher, + instanceInfo, segments, - segmentsPerSplit - ) + readParameters.segmentsPerSplit) } }.toList } - private def parseServerInput(server: String, segments: List[String]): (Matcher, List[String]) = { - val matcher = PINOT_SERVER_PATTERN.matcher(server) - if (matcher.matches() && matcher.groupCount() == 2) matcher -> segments - else throw PinotException(s"'$server' did not match!?") - } - private def createPinotSplitsFromSubSplits( tableType: TableType, generatedSQLs: GeneratedSQLs, - serverMatcher: Matcher, + instanceInfo: InstanceInfo, segments: List[String], segmentsPerSplit: Int): Iterator[PinotSplit] = { - val serverHost = serverMatcher.group(1) - val serverPort = serverMatcher.group(2) val maxSegmentCount = Math.min(segments.size, segmentsPerSplit) segments.grouped(maxSegmentCount).map { subSegments => - val serverAndSegments = - PinotServerAndSegments(serverHost, serverPort, subSegments, tableType) + val serverAndSegments = { + PinotServerAndSegments(instanceInfo.hostName, + instanceInfo.port, + instanceInfo.grpcPort, + subSegments, + tableType) + } PinotSplit(generatedSQLs, serverAndSegments) } } @@ -100,6 +94,7 @@ private[pinot] case class PinotSplit( private[pinot] case class PinotServerAndSegments( serverHost: String, serverPort: String, + serverGrpcPort: Int, segments: List[String], serverType: TableType) { override def toString: String = s"$serverHost:$serverPort($serverType)" diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptions.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptions.scala index ad9e8c8025..1aa643f99a 100644 --- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptions.scala +++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptions.scala @@ -36,10 +36,12 @@ object PinotDataSourceReadOptions { val CONFIG_USE_PUSH_DOWN_FILTERS = "usePushDownFilters" val CONFIG_SEGMENTS_PER_SPLIT = "segmentsPerSplit" val CONFIG_PINOT_SERVER_TIMEOUT_MS = "pinotServerTimeoutMs" + var CONFIG_USE_GRPC_SERVER = "useGrpcServer" private[pinot] val DEFAULT_CONTROLLER: String = "localhost:9000" private[pinot] val DEFAULT_USE_PUSH_DOWN_FILTERS: Boolean = true private[pinot] val DEFAULT_SEGMENTS_PER_SPLIT: Int = 3 private[pinot] val DEFAULT_PINOT_SERVER_TIMEOUT_MS: Long = 10000 + private[pinot] val DEFAULT_USE_GRPC_SERVER: Boolean = false private[pinot] val tableTypes = Seq("OFFLINE", "REALTIME", "HYBRID") @@ -75,6 +77,7 @@ object PinotDataSourceReadOptions { val segmentsPerSplit = options.getInt(CONFIG_SEGMENTS_PER_SPLIT, DEFAULT_SEGMENTS_PER_SPLIT) val pinotServerTimeoutMs = options.getLong(CONFIG_PINOT_SERVER_TIMEOUT_MS, DEFAULT_PINOT_SERVER_TIMEOUT_MS) + val useGrpcServer = options.getBoolean(CONFIG_USE_GRPC_SERVER, DEFAULT_USE_GRPC_SERVER) PinotDataSourceReadOptions( tableName, @@ -83,7 +86,8 @@ object PinotDataSourceReadOptions { broker, usePushDownFilters, segmentsPerSplit, - pinotServerTimeoutMs + pinotServerTimeoutMs, + useGrpcServer ) } } @@ -96,4 +100,5 @@ private[pinot] case class PinotDataSourceReadOptions( broker: String, usePushDownFilters: Boolean, segmentsPerSplit: Int, - pinotServerTimeoutMs: Long) + pinotServerTimeoutMs: Long, + useGrpcServer: Boolean) diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala index 6b9d6b291f..f806a2b268 100644 --- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala +++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.sources.v2.reader.{ import org.apache.spark.sql.types._ import scala.collection.JavaConverters._ +import scala.collection.mutable.Map /** * Spark-Pinot datasource reader to read metadata and create partition splits. @@ -80,8 +81,16 @@ class PinotDataSourceReader(options: DataSourceOptions, userSchema: Option[Struc val routingTable = PinotClusterClient.getRoutingTable(readParameters.broker, generatedSQLs) + val instanceInfo : Map[String, InstanceInfo] = Map() + val instanceInfoReader = (instance:String) => { // cached reader to reduce network round trips + instanceInfo.getOrElseUpdate( + instance, + PinotClusterClient.getInstanceInfo(readParameters.controller, instance) + ) + } + PinotSplitter - .generatePinotSplits(generatedSQLs, routingTable, readParameters.segmentsPerSplit) + .generatePinotSplits(generatedSQLs, routingTable, instanceInfoReader, readParameters) .zipWithIndex .map { case (pinotSplit, partitionId) => diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala index 9a659b3ba1..de42b7a6cf 100644 --- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala +++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala @@ -18,7 +18,7 @@ */ package org.apache.pinot.connector.spark.datasource -import org.apache.pinot.connector.spark.connector.{PinotServerDataFetcher, PinotSplit, PinotUtils} +import org.apache.pinot.connector.spark.connector.{PinotGrpcServerDataFetcher, PinotServerDataFetcher, PinotSplit, PinotUtils} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.v2.reader.InputPartitionReader import org.apache.spark.sql.types.StructType @@ -51,9 +51,15 @@ class PinotInputPartitionReader( override def close(): Unit = {} private def fetchDataAndConvertToInternalRows(): Iterator[InternalRow] = { - PinotServerDataFetcher(partitionId, pinotSplit, dataSourceOptions) - .fetchData() - .flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema)) - .toIterator + if (dataSourceOptions.useGrpcServer) + PinotGrpcServerDataFetcher(pinotSplit) + .fetchData() + .flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema)) + .toIterator + else + PinotServerDataFetcher(partitionId, pinotSplit, dataSourceOptions) + .fetchData() + .flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema)) + .toIterator } } diff --git a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala index b49e4fcfa1..0909f920a8 100644 --- a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala +++ b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala @@ -39,8 +39,9 @@ object ExampleSparkPinotConnectorTest extends Logging { readOffline() readHybrid() readHybridWithSpecificSchema() - readOfflineWithFilters() readHybridWithFilters() + readHybridViaGrpc() + readHybridWithFiltersViaGrpc() readRealtimeWithSelectionColumns() applyJustSomeFilters() } @@ -137,6 +138,35 @@ object ExampleSparkPinotConnectorTest extends Logging { data.show() } + def readHybridViaGrpc()(implicit spark: SparkSession): Unit = { + log.info("## Reading `airlineStats_OFFLINE` table... ##") + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "offline") + .option("useGrpcServer", "true") + .load() + + data.show() + print(data.count()) + } + + def readHybridWithFiltersViaGrpc()(implicit spark: SparkSession): Unit = { + import spark.implicits._ + log.info("## Reading `airlineStats_OFFLINE` table with filter push down... ##") + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "OFFLINE") + .option("useGrpcServer", "true") + .load() + .filter($"DestStateName" === "Florida") + + data.show() + print(data.count()) + } + + def applyJustSomeFilters()(implicit spark: SparkSession): Unit = { import spark.implicits._ log.info("## Reading `airlineStats_OFFLINE and airlineStats_REALTIME` tables with filter push down... ##") diff --git a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotSplitterTest.scala b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotSplitterTest.scala index 02061218b4..161a17fea3 100644 --- a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotSplitterTest.scala +++ b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotSplitterTest.scala @@ -20,14 +20,20 @@ package org.apache.pinot.connector.spark.connector import org.apache.pinot.connector.spark.BaseTest import org.apache.pinot.connector.spark.connector.query.GeneratedSQLs -import org.apache.pinot.connector.spark.exceptions.PinotException +import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions import org.apache.pinot.spi.config.table.TableType +import java.util.regex.Pattern /** * Test num of Spark partitions by routing table and input configs. */ class PinotSplitterTest extends BaseTest { private val generatedPql = GeneratedSQLs("tbl", None, "", "") + private val mockInstanceInfoReader = (server: String) => { + val matcher = Pattern.compile("Server_(.*)_(\\d+)").matcher(server) + matcher.matches() + InstanceInfo(server, matcher.group(1), matcher.group(2), -1) + } private val routingTable = Map( TableType.OFFLINE -> Map( @@ -41,26 +47,38 @@ class PinotSplitterTest extends BaseTest { ) ) + private val getReadOptionsWithSegmentsPerSplit = (segmentsPerSplit: Int) => { + new PinotDataSourceReadOptions( + "tableName", + Option(TableType.OFFLINE), + "controller", + "broker", + false, + segmentsPerSplit, + 1000, + false) + } + test("Total 5 partition splits should be created for maxNumSegmentPerServerRequest = 3") { - val maxNumSegmentPerServerRequest = 3 + val readOptions = getReadOptionsWithSegmentsPerSplit(3) val splitResults = - PinotSplitter.generatePinotSplits(generatedPql, routingTable, maxNumSegmentPerServerRequest) + PinotSplitter.generatePinotSplits(generatedPql, routingTable, mockInstanceInfoReader, readOptions) splitResults.size shouldEqual 5 } test("Total 5 partition splits should be created for maxNumSegmentPerServerRequest = 90") { - val maxNumSegmentPerServerRequest = 90 + val readOptions = getReadOptionsWithSegmentsPerSplit(90) val splitResults = - PinotSplitter.generatePinotSplits(generatedPql, routingTable, maxNumSegmentPerServerRequest) + PinotSplitter.generatePinotSplits(generatedPql, routingTable, mockInstanceInfoReader, readOptions) splitResults.size shouldEqual 5 } test("Total 10 partition splits should be created for maxNumSegmentPerServerRequest = 1") { - val maxNumSegmentPerServerRequest = 1 + val readOptions = getReadOptionsWithSegmentsPerSplit(1) val splitResults = - PinotSplitter.generatePinotSplits(generatedPql, routingTable, maxNumSegmentPerServerRequest) + PinotSplitter.generatePinotSplits(generatedPql, routingTable, mockInstanceInfoReader, readOptions) splitResults.size shouldEqual 10 } @@ -69,31 +87,46 @@ class PinotSplitterTest extends BaseTest { val inputRoutingTable = Map( TableType.REALTIME -> Map("Server_192.168.1.100_9000" -> List("segment1")) ) + val readOptions = getReadOptionsWithSegmentsPerSplit(5) - val splitResults = PinotSplitter.generatePinotSplits(generatedPql, inputRoutingTable, 5) + val splitResults = PinotSplitter.generatePinotSplits(generatedPql, inputRoutingTable, mockInstanceInfoReader, readOptions) val expectedOutput = List( PinotSplit( generatedPql, - PinotServerAndSegments("192.168.1.100", "9000", List("segment1"), TableType.REALTIME) + PinotServerAndSegments("192.168.1.100", "9000", -1, List("segment1"), TableType.REALTIME) ) ) expectedOutput should contain theSameElementsAs splitResults } - test("GeneratePinotSplits method should throw exception due to wrong input Server_HOST_PORT") { + test("GeneratePinotSplits with Grpc port reading enabled") { val inputRoutingTable = Map( - TableType.REALTIME -> Map( - "Server_192.168.1.100_9000" -> List("segment1"), - "Server_192.168.2.100" -> List("segment5") - ) + TableType.REALTIME -> Map("Server_192.168.1.100_9000" -> List("segment1")) ) - - val exception = intercept[PinotException] { - PinotSplitter.generatePinotSplits(generatedPql, inputRoutingTable, 5) + val inputReadOptions = new PinotDataSourceReadOptions( + "tableName", + Option(TableType.REALTIME), + "controller", + "broker", + false, + 1, + 1000, + true) + + val inputGrpcPortReader = (server: String) => { + InstanceInfo(server, "192.168.1.100", "9000", 8090) } - exception.getMessage shouldEqual "'Server_192.168.2.100' did not match!?" - } + val splitResults = + PinotSplitter.generatePinotSplits(generatedPql, inputRoutingTable, inputGrpcPortReader, inputReadOptions) + val expectedOutput = List( + PinotSplit( + generatedPql, + PinotServerAndSegments("192.168.1.100", "9000", 8090, List("segment1"), TableType.REALTIME) + ) + ) + expectedOutput should contain theSameElementsAs splitResults + } } diff --git a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptionsTest.scala b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptionsTest.scala index 83baf3fe3b..f395e0c543 100644 --- a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptionsTest.scala +++ b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptionsTest.scala @@ -18,7 +18,7 @@ */ package org.apache.pinot.connector.spark.datasource -import org.apache.pinot.connector.spark.BaseTest +import org.apache.pinot.connector.spark.{BaseTest, datasource} import org.apache.pinot.connector.spark.exceptions.PinotException import org.apache.spark.sql.sources.v2.DataSourceOptions @@ -36,7 +36,8 @@ class PinotDataSourceReadOptionsTest extends BaseTest { PinotDataSourceReadOptions.CONFIG_CONTROLLER -> "localhost:9000", PinotDataSourceReadOptions.CONFIG_BROKER -> "localhost:8000", PinotDataSourceReadOptions.CONFIG_SEGMENTS_PER_SPLIT -> "1", - PinotDataSourceReadOptions.CONFIG_USE_PUSH_DOWN_FILTERS -> "false" + PinotDataSourceReadOptions.CONFIG_USE_PUSH_DOWN_FILTERS -> "false", + PinotDataSourceReadOptions.CONFIG_USE_GRPC_SERVER -> "false", ) val datasourceOptions = new DataSourceOptions(options.asJava) @@ -50,7 +51,8 @@ class PinotDataSourceReadOptionsTest extends BaseTest { "localhost:8000", false, 1, - 10000 + 10000, + false ) pinotDataSourceReadOptions shouldEqual expected diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java index 7c738a40c5..61ec1b3a9a 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java @@ -26,7 +26,9 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.ZkStarter; import org.apache.pinot.spi.config.table.TableConfig; @@ -64,6 +66,13 @@ public class HybridQuickstart extends QuickStartBase { PinotAdministrator.main(arguments.toArray(new String[arguments.size()])); } + public Map<String, Object> getConfigOverrides() { + Map<String, Object> overrides = new HashMap<>(); + overrides.put("pinot.server.grpc.enable", "true"); + overrides.put("pinot.server.grpc.port", "8090"); + return overrides; + } + private QuickstartTableRequest prepareTableRequest(File baseDir) throws IOException { _schemaFile = new File(baseDir, "airlineStats_schema.json"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org