Jackie-Jiang commented on a change in pull request #7838: URL: https://github.com/apache/pinot/pull/7838#discussion_r760672320
########## File path: pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java ########## @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.broker.api.RequestStatistics; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.broker.routing.RoutingManager; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.utils.grpc.GrpcQueryClient; +import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; +import org.apache.pinot.common.utils.helix.TableCache; +import org.apache.pinot.core.query.reduce.StreamingReduceService; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.core.transport.TlsConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +/** + * The <code>SingleConnectionBrokerRequestHandler</code> class is a thread-safe broker request handler using a single + * connection per server to route the queries. + */ +@ThreadSafe +public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final int DEFAULT_MAXIMUM_REQUEST_ATTEMPT = 10; + + private final GrpcQueryClient.Config _grpcConfig; + private final Map<String, AtomicInteger> _concurrentQueriesCountMap = new ConcurrentHashMap<>(); + private final int _maxBacklogPerServer; + private final int _grpcBrokerThreadPoolSize; + private final StreamingReduceService _streamingReduceService; + + private transient PinotStreamingQueryClient _streamingQueryClient; + + public GrpcBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + BrokerMetrics brokerMetrics, TlsConfig tlsConfig) { + super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + _grpcConfig = buildGrpcQueryClientConfig(config); + + // parse and fill constants + _maxBacklogPerServer = Integer.parseInt(_config.getProperty("MAXIMUM_BACKLOG_PER_SERVER", "1000")); + _grpcBrokerThreadPoolSize = Integer.parseInt(_config.getProperty("GRPC_BROKER_THREAD_POOL_SIZE", "10")); + + // create streaming query client + _streamingQueryClient = new PinotStreamingQueryClient(_grpcConfig); + + // create streaming reduce service + _streamingReduceService = new StreamingReduceService(config); + } + + @Override + public void start() { + } + + @Override + public synchronized void shutDown() { + _streamingReduceService.shutDown(); + } + + @Override + protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, + @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, + long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics) + throws Exception { + assert offlineBrokerRequest != null || realtimeBrokerRequest != null; + + String rawTableName = TableNameBuilder.extractRawTableName(originalBrokerRequest.getQuerySource().getTableName()); + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = new HashMap<>(); + // Making request to a streaming server response. + + if (offlineBrokerRequest != null) { + // to offline servers. + assert offlineRoutingTable != null; + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponses = + streamingQueryToPinotServer(requestId, _brokerId, rawTableName, TableType.OFFLINE, + offlineBrokerRequest, offlineRoutingTable, timeoutMs, true, 1); + responseMap.putAll(serverResponses); + } + if (realtimeBrokerRequest != null) { + // to realtime servers. + assert realtimeRoutingTable != null; + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponses = + streamingQueryToPinotServer(requestId, _brokerId, rawTableName, TableType.REALTIME, + realtimeBrokerRequest, realtimeRoutingTable, timeoutMs, true, 1); + responseMap.putAll(serverResponses); + } + BrokerResponseNative brokerResponse = _streamingReduceService.reduceOnStreamResponse( + originalBrokerRequest, responseMap, timeoutMs, _brokerMetrics); + return brokerResponse; + } + + /** + * Query pinot server for data table. + */ + public Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> streamingQueryToPinotServer( + final long requestId, + final String brokerHost, + final String rawTableName, + final TableType tableType, + BrokerRequest brokerRequest, + Map<ServerInstance, List<String>> routingTable, + long connectionTimeoutInMillis, + boolean ignoreEmptyResponses, + int pinotRetryCount) { + // Retries will all hit the same server because the routing decision has already been made by the pinot broker + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap = new HashMap<>(); + for (Map.Entry<ServerInstance, List<String>> routingEntry : routingTable.entrySet()) { + ServerInstance serverInstance = routingEntry.getKey(); + List<String> segments = routingEntry.getValue(); + String serverHost = serverInstance.getHostname(); + int port = serverInstance.getGrpcPort(); + // ensure concurrent request count from the same grpc server cannot exceed maximum + if (!_concurrentQueriesCountMap.containsKey(serverHost)) { + _concurrentQueriesCountMap.put(serverHost, new AtomicInteger(0)); + } + int concurrentQueryNum = _concurrentQueriesCountMap.get(serverHost).get(); Review comment: This is not thread safe ```suggestion AtomicInteger concurrentQueryCount = _concurrentQueriesCountMap.computeIfAbsent(serverHost, k -> new AtomicInteger(0)); ``` Since we don't have throttling implemented yet, suggest removing the map for now, and add it back when we add the throttling feature. Currently there is no routing layer throttling even for the netty based request handler ########## File path: pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java ########## @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.broker.api.RequestStatistics; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.broker.routing.RoutingManager; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.utils.grpc.GrpcQueryClient; +import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; +import org.apache.pinot.common.utils.helix.TableCache; +import org.apache.pinot.core.query.reduce.StreamingReduceService; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.core.transport.TlsConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +/** + * The <code>SingleConnectionBrokerRequestHandler</code> class is a thread-safe broker request handler using a single + * connection per server to route the queries. + */ +@ThreadSafe +public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final int DEFAULT_MAXIMUM_REQUEST_ATTEMPT = 10; + + private final GrpcQueryClient.Config _grpcConfig; + private final Map<String, AtomicInteger> _concurrentQueriesCountMap = new ConcurrentHashMap<>(); + private final int _maxBacklogPerServer; + private final int _grpcBrokerThreadPoolSize; + private final StreamingReduceService _streamingReduceService; + + private transient PinotStreamingQueryClient _streamingQueryClient; + + public GrpcBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + BrokerMetrics brokerMetrics, TlsConfig tlsConfig) { + super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + _grpcConfig = buildGrpcQueryClientConfig(config); + + // parse and fill constants + _maxBacklogPerServer = Integer.parseInt(_config.getProperty("MAXIMUM_BACKLOG_PER_SERVER", "1000")); + _grpcBrokerThreadPoolSize = Integer.parseInt(_config.getProperty("GRPC_BROKER_THREAD_POOL_SIZE", "10")); + + // create streaming query client + _streamingQueryClient = new PinotStreamingQueryClient(_grpcConfig); + + // create streaming reduce service + _streamingReduceService = new StreamingReduceService(config); + } + + @Override + public void start() { + } + + @Override + public synchronized void shutDown() { + _streamingReduceService.shutDown(); + } + + @Override + protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, + @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, + long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics) + throws Exception { + assert offlineBrokerRequest != null || realtimeBrokerRequest != null; + + String rawTableName = TableNameBuilder.extractRawTableName(originalBrokerRequest.getQuerySource().getTableName()); + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = new HashMap<>(); + // Making request to a streaming server response. + + if (offlineBrokerRequest != null) { + // to offline servers. + assert offlineRoutingTable != null; + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponses = + streamingQueryToPinotServer(requestId, _brokerId, rawTableName, TableType.OFFLINE, + offlineBrokerRequest, offlineRoutingTable, timeoutMs, true, 1); + responseMap.putAll(serverResponses); + } + if (realtimeBrokerRequest != null) { + // to realtime servers. + assert realtimeRoutingTable != null; + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponses = + streamingQueryToPinotServer(requestId, _brokerId, rawTableName, TableType.REALTIME, + realtimeBrokerRequest, realtimeRoutingTable, timeoutMs, true, 1); + responseMap.putAll(serverResponses); + } + BrokerResponseNative brokerResponse = _streamingReduceService.reduceOnStreamResponse( + originalBrokerRequest, responseMap, timeoutMs, _brokerMetrics); + return brokerResponse; + } + + /** + * Query pinot server for data table. + */ + public Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> streamingQueryToPinotServer( + final long requestId, + final String brokerHost, + final String rawTableName, + final TableType tableType, + BrokerRequest brokerRequest, + Map<ServerInstance, List<String>> routingTable, + long connectionTimeoutInMillis, + boolean ignoreEmptyResponses, + int pinotRetryCount) { + // Retries will all hit the same server because the routing decision has already been made by the pinot broker + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap = new HashMap<>(); + for (Map.Entry<ServerInstance, List<String>> routingEntry : routingTable.entrySet()) { + ServerInstance serverInstance = routingEntry.getKey(); + List<String> segments = routingEntry.getValue(); + String serverHost = serverInstance.getHostname(); + int port = serverInstance.getGrpcPort(); + // ensure concurrent request count from the same grpc server cannot exceed maximum + if (!_concurrentQueriesCountMap.containsKey(serverHost)) { + _concurrentQueriesCountMap.put(serverHost, new AtomicInteger(0)); + } + int concurrentQueryNum = _concurrentQueriesCountMap.get(serverHost).get(); + if (concurrentQueryNum > _maxBacklogPerServer) { + ProcessingException processingException = new ProcessingException(QueryException.UNKNOWN_ERROR_CODE); + processingException.setMessage("Reaching server query max backlog size is - " + _maxBacklogPerServer); + throw new RuntimeException(processingException); + } + // TODO: enable throttling on per host bases. +// _concurrentQueriesCountMap.get(serverHost).incrementAndGet(); + Iterator<Server.ServerResponse> streamingResponse = _streamingQueryClient.submit(serverHost, port, + new GrpcRequestBuilder() + .setSegments(segments) + .setBrokerRequest(brokerRequest) + .setEnableStreaming(true)); + serverResponseMap.put(serverInstance.toServerRoutingInstance(tableType), streamingResponse); + } + return serverResponseMap; + } + + private static long makeGrpcRequestId(long requestId, int requestAttemptId) { + return requestId * DEFAULT_MAXIMUM_REQUEST_ATTEMPT + requestAttemptId; + } + + // return empty config for now + private GrpcQueryClient.Config buildGrpcQueryClientConfig(PinotConfiguration config) { + return new GrpcQueryClient.Config(); + } + + public static class PinotStreamingQueryClient { + private final Map<String, GrpcQueryClient> _grpcQueryClientMap = new HashMap<>(); Review comment: This is not thread safe. We need to use a concurrent map, and call `computeIfAbsent()` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java ########## @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.reduce; + +import java.util.Iterator; +import java.util.Map; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.common.datatable.DataTableFactory; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The <code>StreamingReduceService</code> class provides service to reduce grpc response gathered from multiple servers + * to {@link BrokerResponseNative}. + */ +@ThreadSafe +public class StreamingReduceService extends BaseReduceService { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamingReduceService.class); + + public StreamingReduceService(PinotConfiguration config) { + super(config); + } + + public BrokerResponseNative reduceOnStreamResponse(BrokerRequest brokerRequest, + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap, long reduceTimeOutMs, + @Nullable BrokerMetrics brokerMetrics) { + if (serverResponseMap.isEmpty()) { + // Empty response. + return BrokerResponseNative.empty(); + } + + // prepare contextual info for reduce. + PinotQuery pinotQuery = brokerRequest.getPinotQuery(); + Map<String, String> queryOptions = + pinotQuery != null ? pinotQuery.getQueryOptions() : brokerRequest.getQueryOptions(); + boolean enableTrace = + queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)); + + QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest); + + String tableName = brokerRequest.getQuerySource().getTableName(); + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + + // initialize empty response. + BrokerMetricsAggregator aggregator = new BrokerMetricsAggregator(enableTrace); + + // Process server response. + DataTableReducerContext dataTableReducerContext = + new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs, + _groupByTrimThreshold); + StreamingReducer streamingReducer = ResultReducerFactory.getStreamingReducer(queryContext); + + streamingReducer.init(dataTableReducerContext); + + // TODO: use concurrent process instead of determine-order for-loop. Review comment: Suggest handling it in this PR as this is fundamental for the streaming reducing. Both the reducer and the metric aggregator should be thread safe ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java ########## @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.reduce; + +import java.util.Iterator; +import java.util.Map; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.common.datatable.DataTableFactory; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The <code>StreamingReduceService</code> class provides service to reduce grpc response gathered from multiple servers + * to {@link BrokerResponseNative}. + */ +@ThreadSafe +public class StreamingReduceService extends BaseReduceService { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamingReduceService.class); + + public StreamingReduceService(PinotConfiguration config) { + super(config); + } + + public BrokerResponseNative reduceOnStreamResponse(BrokerRequest brokerRequest, + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap, long reduceTimeOutMs, + @Nullable BrokerMetrics brokerMetrics) { + if (serverResponseMap.isEmpty()) { + // Empty response. + return BrokerResponseNative.empty(); + } + + // prepare contextual info for reduce. + PinotQuery pinotQuery = brokerRequest.getPinotQuery(); + Map<String, String> queryOptions = + pinotQuery != null ? pinotQuery.getQueryOptions() : brokerRequest.getQueryOptions(); + boolean enableTrace = + queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)); + + QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest); + + String tableName = brokerRequest.getQuerySource().getTableName(); + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + + // initialize empty response. + BrokerMetricsAggregator aggregator = new BrokerMetricsAggregator(enableTrace); + + // Process server response. + DataTableReducerContext dataTableReducerContext = + new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs, + _groupByTrimThreshold); + StreamingReducer streamingReducer = ResultReducerFactory.getStreamingReducer(queryContext); + + streamingReducer.init(dataTableReducerContext); + + // TODO: use concurrent process instead of determine-order for-loop. + for (Map.Entry<ServerRoutingInstance, Iterator<Server.ServerResponse>> entry: serverResponseMap.entrySet()) { + Iterator<Server.ServerResponse> streamingResponses = entry.getValue(); + while (streamingResponses.hasNext()) { + Server.ServerResponse streamingResponse = streamingResponses.next(); + try { + DataTable dataTable = DataTableFactory.getDataTable(streamingResponse.getPayload().asReadOnlyByteBuffer()); + streamingReducer.reduce(entry.getKey(), dataTable); Review comment: Reducer should only reduce on data block, aggregator should only aggregate on metadata block, let's handle them separately ########## File path: pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java ########## @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.broker.api.RequestStatistics; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.broker.routing.RoutingManager; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.utils.grpc.GrpcQueryClient; +import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; +import org.apache.pinot.common.utils.helix.TableCache; +import org.apache.pinot.core.query.reduce.StreamingReduceService; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.core.transport.TlsConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +/** + * The <code>SingleConnectionBrokerRequestHandler</code> class is a thread-safe broker request handler using a single + * connection per server to route the queries. + */ +@ThreadSafe +public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final int DEFAULT_MAXIMUM_REQUEST_ATTEMPT = 10; + + private final GrpcQueryClient.Config _grpcConfig; + private final Map<String, AtomicInteger> _concurrentQueriesCountMap = new ConcurrentHashMap<>(); + private final int _maxBacklogPerServer; + private final int _grpcBrokerThreadPoolSize; + private final StreamingReduceService _streamingReduceService; + + private transient PinotStreamingQueryClient _streamingQueryClient; + + public GrpcBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + BrokerMetrics brokerMetrics, TlsConfig tlsConfig) { + super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + _grpcConfig = buildGrpcQueryClientConfig(config); + + // parse and fill constants + _maxBacklogPerServer = Integer.parseInt(_config.getProperty("MAXIMUM_BACKLOG_PER_SERVER", "1000")); + _grpcBrokerThreadPoolSize = Integer.parseInt(_config.getProperty("GRPC_BROKER_THREAD_POOL_SIZE", "10")); + + // create streaming query client + _streamingQueryClient = new PinotStreamingQueryClient(_grpcConfig); + + // create streaming reduce service + _streamingReduceService = new StreamingReduceService(config); + } + + @Override + public void start() { + } + + @Override + public synchronized void shutDown() { + _streamingReduceService.shutDown(); + } + + @Override + protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, + @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, + long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics) + throws Exception { + assert offlineBrokerRequest != null || realtimeBrokerRequest != null; + + String rawTableName = TableNameBuilder.extractRawTableName(originalBrokerRequest.getQuerySource().getTableName()); + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = new HashMap<>(); + // Making request to a streaming server response. + + if (offlineBrokerRequest != null) { + // to offline servers. + assert offlineRoutingTable != null; + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponses = + streamingQueryToPinotServer(requestId, _brokerId, rawTableName, TableType.OFFLINE, + offlineBrokerRequest, offlineRoutingTable, timeoutMs, true, 1); + responseMap.putAll(serverResponses); + } + if (realtimeBrokerRequest != null) { + // to realtime servers. + assert realtimeRoutingTable != null; + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponses = + streamingQueryToPinotServer(requestId, _brokerId, rawTableName, TableType.REALTIME, + realtimeBrokerRequest, realtimeRoutingTable, timeoutMs, true, 1); + responseMap.putAll(serverResponses); + } + BrokerResponseNative brokerResponse = _streamingReduceService.reduceOnStreamResponse( + originalBrokerRequest, responseMap, timeoutMs, _brokerMetrics); + return brokerResponse; + } + + /** + * Query pinot server for data table. + */ + public Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> streamingQueryToPinotServer( + final long requestId, + final String brokerHost, + final String rawTableName, + final TableType tableType, + BrokerRequest brokerRequest, + Map<ServerInstance, List<String>> routingTable, + long connectionTimeoutInMillis, + boolean ignoreEmptyResponses, + int pinotRetryCount) { + // Retries will all hit the same server because the routing decision has already been made by the pinot broker + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap = new HashMap<>(); + for (Map.Entry<ServerInstance, List<String>> routingEntry : routingTable.entrySet()) { + ServerInstance serverInstance = routingEntry.getKey(); + List<String> segments = routingEntry.getValue(); + String serverHost = serverInstance.getHostname(); + int port = serverInstance.getGrpcPort(); + // ensure concurrent request count from the same grpc server cannot exceed maximum + if (!_concurrentQueriesCountMap.containsKey(serverHost)) { + _concurrentQueriesCountMap.put(serverHost, new AtomicInteger(0)); + } + int concurrentQueryNum = _concurrentQueriesCountMap.get(serverHost).get(); + if (concurrentQueryNum > _maxBacklogPerServer) { + ProcessingException processingException = new ProcessingException(QueryException.UNKNOWN_ERROR_CODE); + processingException.setMessage("Reaching server query max backlog size is - " + _maxBacklogPerServer); + throw new RuntimeException(processingException); + } + // TODO: enable throttling on per host bases. +// _concurrentQueriesCountMap.get(serverHost).incrementAndGet(); + Iterator<Server.ServerResponse> streamingResponse = _streamingQueryClient.submit(serverHost, port, + new GrpcRequestBuilder() + .setSegments(segments) + .setBrokerRequest(brokerRequest) + .setEnableStreaming(true)); + serverResponseMap.put(serverInstance.toServerRoutingInstance(tableType), streamingResponse); + } + return serverResponseMap; + } + + private static long makeGrpcRequestId(long requestId, int requestAttemptId) { Review comment: Is this used? We don't have retry support as of now ########## File path: pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java ########## @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.broker.api.RequestStatistics; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.broker.routing.RoutingManager; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.utils.grpc.GrpcQueryClient; +import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; +import org.apache.pinot.common.utils.helix.TableCache; +import org.apache.pinot.core.query.reduce.StreamingReduceService; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.core.transport.TlsConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +/** + * The <code>SingleConnectionBrokerRequestHandler</code> class is a thread-safe broker request handler using a single + * connection per server to route the queries. + */ +@ThreadSafe +public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final int DEFAULT_MAXIMUM_REQUEST_ATTEMPT = 10; + + private final GrpcQueryClient.Config _grpcConfig; + private final Map<String, AtomicInteger> _concurrentQueriesCountMap = new ConcurrentHashMap<>(); + private final int _maxBacklogPerServer; + private final int _grpcBrokerThreadPoolSize; + private final StreamingReduceService _streamingReduceService; + + private transient PinotStreamingQueryClient _streamingQueryClient; + + public GrpcBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + BrokerMetrics brokerMetrics, TlsConfig tlsConfig) { + super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + _grpcConfig = buildGrpcQueryClientConfig(config); + + // parse and fill constants + _maxBacklogPerServer = Integer.parseInt(_config.getProperty("MAXIMUM_BACKLOG_PER_SERVER", "1000")); + _grpcBrokerThreadPoolSize = Integer.parseInt(_config.getProperty("GRPC_BROKER_THREAD_POOL_SIZE", "10")); + + // create streaming query client + _streamingQueryClient = new PinotStreamingQueryClient(_grpcConfig); + + // create streaming reduce service + _streamingReduceService = new StreamingReduceService(config); + } + + @Override + public void start() { + } + + @Override + public synchronized void shutDown() { + _streamingReduceService.shutDown(); + } + + @Override + protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, + @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, + long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics) + throws Exception { + assert offlineBrokerRequest != null || realtimeBrokerRequest != null; + + String rawTableName = TableNameBuilder.extractRawTableName(originalBrokerRequest.getQuerySource().getTableName()); + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = new HashMap<>(); + // Making request to a streaming server response. + + if (offlineBrokerRequest != null) { + // to offline servers. + assert offlineRoutingTable != null; + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponses = + streamingQueryToPinotServer(requestId, _brokerId, rawTableName, TableType.OFFLINE, + offlineBrokerRequest, offlineRoutingTable, timeoutMs, true, 1); + responseMap.putAll(serverResponses); + } + if (realtimeBrokerRequest != null) { + // to realtime servers. + assert realtimeRoutingTable != null; + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponses = + streamingQueryToPinotServer(requestId, _brokerId, rawTableName, TableType.REALTIME, + realtimeBrokerRequest, realtimeRoutingTable, timeoutMs, true, 1); + responseMap.putAll(serverResponses); + } + BrokerResponseNative brokerResponse = _streamingReduceService.reduceOnStreamResponse( + originalBrokerRequest, responseMap, timeoutMs, _brokerMetrics); + return brokerResponse; + } + + /** + * Query pinot server for data table. + */ + public Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> streamingQueryToPinotServer( + final long requestId, Review comment: (minor, format) Can we put them into fewer lines? Also we don't usually put `final` for method arguments ########## File path: pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java ########## @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.broker.api.RequestStatistics; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.broker.routing.RoutingManager; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.utils.grpc.GrpcQueryClient; +import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; +import org.apache.pinot.common.utils.helix.TableCache; +import org.apache.pinot.core.query.reduce.StreamingReduceService; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.core.transport.TlsConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +/** + * The <code>SingleConnectionBrokerRequestHandler</code> class is a thread-safe broker request handler using a single + * connection per server to route the queries. + */ +@ThreadSafe +public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final int DEFAULT_MAXIMUM_REQUEST_ATTEMPT = 10; + + private final GrpcQueryClient.Config _grpcConfig; + private final Map<String, AtomicInteger> _concurrentQueriesCountMap = new ConcurrentHashMap<>(); + private final int _maxBacklogPerServer; + private final int _grpcBrokerThreadPoolSize; + private final StreamingReduceService _streamingReduceService; + + private transient PinotStreamingQueryClient _streamingQueryClient; + + public GrpcBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + BrokerMetrics brokerMetrics, TlsConfig tlsConfig) { + super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + _grpcConfig = buildGrpcQueryClientConfig(config); + + // parse and fill constants + _maxBacklogPerServer = Integer.parseInt(_config.getProperty("MAXIMUM_BACKLOG_PER_SERVER", "1000")); + _grpcBrokerThreadPoolSize = Integer.parseInt(_config.getProperty("GRPC_BROKER_THREAD_POOL_SIZE", "10")); + + // create streaming query client + _streamingQueryClient = new PinotStreamingQueryClient(_grpcConfig); + + // create streaming reduce service + _streamingReduceService = new StreamingReduceService(config); + } + + @Override + public void start() { + } + + @Override + public synchronized void shutDown() { + _streamingReduceService.shutDown(); + } + + @Override + protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, + @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, + long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics) + throws Exception { + assert offlineBrokerRequest != null || realtimeBrokerRequest != null; + + String rawTableName = TableNameBuilder.extractRawTableName(originalBrokerRequest.getQuerySource().getTableName()); + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = new HashMap<>(); + // Making request to a streaming server response. + + if (offlineBrokerRequest != null) { + // to offline servers. + assert offlineRoutingTable != null; + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponses = + streamingQueryToPinotServer(requestId, _brokerId, rawTableName, TableType.OFFLINE, Review comment: We can reduce the garbage created by passing in the responseMap and directly adding entries to it ########## File path: pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java ########## @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.broker.api.RequestStatistics; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.broker.routing.RoutingManager; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.utils.grpc.GrpcQueryClient; +import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; +import org.apache.pinot.common.utils.helix.TableCache; +import org.apache.pinot.core.query.reduce.StreamingReduceService; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.core.transport.TlsConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +/** + * The <code>SingleConnectionBrokerRequestHandler</code> class is a thread-safe broker request handler using a single + * connection per server to route the queries. + */ +@ThreadSafe +public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final int DEFAULT_MAXIMUM_REQUEST_ATTEMPT = 10; + + private final GrpcQueryClient.Config _grpcConfig; + private final Map<String, AtomicInteger> _concurrentQueriesCountMap = new ConcurrentHashMap<>(); + private final int _maxBacklogPerServer; + private final int _grpcBrokerThreadPoolSize; + private final StreamingReduceService _streamingReduceService; + + private transient PinotStreamingQueryClient _streamingQueryClient; + + public GrpcBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + BrokerMetrics brokerMetrics, TlsConfig tlsConfig) { + super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + _grpcConfig = buildGrpcQueryClientConfig(config); + + // parse and fill constants + _maxBacklogPerServer = Integer.parseInt(_config.getProperty("MAXIMUM_BACKLOG_PER_SERVER", "1000")); Review comment: These constants should be declared in the `CommonConstants` ########## File path: pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java ########## @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.broker.api.RequestStatistics; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.broker.routing.RoutingManager; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.utils.grpc.GrpcQueryClient; +import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; +import org.apache.pinot.common.utils.helix.TableCache; +import org.apache.pinot.core.query.reduce.StreamingReduceService; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.core.transport.TlsConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +/** + * The <code>SingleConnectionBrokerRequestHandler</code> class is a thread-safe broker request handler using a single + * connection per server to route the queries. + */ +@ThreadSafe +public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final int DEFAULT_MAXIMUM_REQUEST_ATTEMPT = 10; + + private final GrpcQueryClient.Config _grpcConfig; + private final Map<String, AtomicInteger> _concurrentQueriesCountMap = new ConcurrentHashMap<>(); + private final int _maxBacklogPerServer; + private final int _grpcBrokerThreadPoolSize; + private final StreamingReduceService _streamingReduceService; + + private transient PinotStreamingQueryClient _streamingQueryClient; Review comment: Why is this `transient`? it should be `final` ########## File path: pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java ########## @@ -235,14 +236,21 @@ public void start() // Configure TLS for netty connection to server TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX); - if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) { + if (_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE) + .equals(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) { Review comment: `equalsIgnoreCase` to be more robust? Also suggest adding some info log for debugging purpose ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java ########## @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.reduce; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.response.broker.SelectionResults; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.core.util.QueryOptionsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class SelectionOnlyStreamingReducer implements StreamingReducer { + private static final Logger LOGGER = LoggerFactory.getLogger(SelectionOnlyStreamingReducer.class); + + private final QueryContext _queryContext; + private final boolean _preserveType; + private final boolean _responseFormatSql; Review comment: Let's only support sql response format ########## File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java ########## @@ -217,6 +217,10 @@ private CommonConstants() { public static final String CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD = "pinot.broker.groupby.trim.threshold"; public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000; + public static final String BROKER_REQUEST_HANDLER_TYPE = "pinot.broker.request.handler.type"; + public static final String DEFAULT_BROKER_REQUEST_HANDLER_TYPE = "netty"; + public static final String GRPC_BROKER_REQUEST_HANDLER_TYPE = "grpc"; Review comment: (minor) Let's also add one for netty ```suggestion public static final String DEFAULT_BROKER_REQUEST_HANDLER_TYPE = NETTY_BROKER_REQUEST_HANDLER_TYPE; public static final String NETTY_BROKER_REQUEST_HANDLER_TYPE = "netty"; public static final String GRPC_BROKER_REQUEST_HANDLER_TYPE = "grpc"; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org