KKcorps commented on code in PR #8467: URL: https://github.com/apache/pinot/pull/8467#discussion_r854852620
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -3374,6 +3376,40 @@ public TableStats getTableStats(String tableNameWithType) { return new TableStats(creationTime); } + /** + * Returns map of tableName to list of live brokers + * @return Map of tableName to list of ONLINE brokers serving the table + */ + public Map<String, List<InstanceInfo>> getLiveBrokers() { Review Comment: We should rename this method to reflect that it returns table to hosts mapping and not tenant to hosts mapping. Maybe something like - `getLiveBrokersForTables` ########## pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java: ########## @@ -43,6 +43,29 @@ public static Connection fromZookeeper(String zkUrl) { return fromZookeeper(zkUrl, getDefault()); } + /** + * Creates a connection to Pinot cluster, given its Controller URL + * + * @param scheme controller URL scheme + * @param controllerHost controller host + * @param controllerPort controller port + * @return A connection that connects to brokers as per the given controller + */ + public static Connection fromController(String scheme, String controllerHost, int controllerPort) { + return fromController(scheme, controllerHost, controllerPort, 1000); + } + + public static Connection fromController(String scheme, String controllerHost, int controllerPort, Review Comment: Add javadoc ########## pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ControllerBrokerSelectorTest.java: ########## @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.client; + +public class ControllerBrokerSelectorTest { + private ControllerBrokerSelectorTest() { + } + + public static void main(String[] args) { Review Comment: We should be putting this inside `@Test` method and not main method. ########## pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java: ########## @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.client; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintains broker cache this is updated periodically + */ +public class BrokerCacheUpdaterPeriodic implements UpdatableBrokerCache { + private final BrokerCache _brokerCache; + private final ScheduledExecutorService _scheduledExecutorService; + private final long _brokerUpdateFreqInMillis; + + private static final Logger LOGGER = LoggerFactory.getLogger(BrokerCacheUpdaterPeriodic.class); + + public BrokerCacheUpdaterPeriodic(String scheme, String controllerHost, + int controllerPort, long brokerUpdateFreqInMillis) { + _brokerCache = new BrokerCache(scheme, controllerHost, controllerPort); + _scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + _brokerUpdateFreqInMillis = brokerUpdateFreqInMillis; + } + + public void init() throws Exception { + try { + _brokerCache.updateBrokerData(); + } catch (Exception e) { + LOGGER.error("Broker cache update failed", e); + throw e; Review Comment: Remove `throw e` ########## pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java: ########## @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.client; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintains broker cache this is updated periodically + */ +public class BrokerCacheUpdaterPeriodic implements UpdatableBrokerCache { + private final BrokerCache _brokerCache; + private final ScheduledExecutorService _scheduledExecutorService; + private final long _brokerUpdateFreqInMillis; + + private static final Logger LOGGER = LoggerFactory.getLogger(BrokerCacheUpdaterPeriodic.class); + + public BrokerCacheUpdaterPeriodic(String scheme, String controllerHost, + int controllerPort, long brokerUpdateFreqInMillis) { + _brokerCache = new BrokerCache(scheme, controllerHost, controllerPort); + _scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + _brokerUpdateFreqInMillis = brokerUpdateFreqInMillis; + } + + public void init() throws Exception { + try { + _brokerCache.updateBrokerData(); + } catch (Exception e) { + LOGGER.error("Broker cache update failed", e); + throw e; + } + + if (_brokerUpdateFreqInMillis > 0) { + _scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + _brokerCache.updateBrokerData(); + } catch (Exception e) { + LOGGER.error("Broker cache update failed", e); + } + } + }, 0, _brokerUpdateFreqInMillis, TimeUnit.MILLISECONDS); + } + } + + public String getBroker(String tableName) { + return _brokerCache.getBroker(tableName); + } + + @Override + public List<String> getBrokers() { + return _brokerCache.getBrokers(); + } + + @Override + public void triggerBrokerCacheUpdate() throws Exception { + _brokerCache.updateBrokerData(); + } + + public void close() { + try { + _scheduledExecutorService.shutdown(); + } catch (Exception e) { + LOGGER.error("Broker cache update task cancellation failed", e); Review Comment: Maybe something like `Cannot shutdown Broker Cache update periodic task` ########## pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java: ########## @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.client; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.BoundRequestBuilder; +import org.asynchttpclient.Dsl; +import org.asynchttpclient.Response; + + +/** + * Maintains table -> list of brokers, supports update + * TODO can we introduce a simple websocket based controller endpoint to make the update reactive in the client? + */ +public class BrokerCache { + + @JsonIgnoreProperties(ignoreUnknown = true) + private static class BrokerInstance { + private String _host; + private Integer _port; + + public String getHost() { + return _host; + } + + public void setHost(String host) { + _host = host; + } + + public Integer getPort() { + return _port; + } + + public void setPort(Integer port) { + _port = port; + } + } + + private static final TypeReference<Map<String, List<BrokerInstance>>> RESPONSE_TYPE_REF = + new TypeReference<Map<String, List<BrokerInstance>>>() { + }; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final Random _random = new Random(); + private final AsyncHttpClient _client; + private final String _address; + private volatile BrokerData _brokerData; + + public BrokerCache(String scheme, String controllerHost, int controllerPort) { + _client = Dsl.asyncHttpClient(); + ControllerRequestURLBuilder controllerRequestURLBuilder = + ControllerRequestURLBuilder.baseUrl(scheme + "://" + controllerHost + ":" + controllerPort); + _address = controllerRequestURLBuilder.forLiveBrokerTablesGet(); + } + + private Map<String, List<BrokerInstance>> getTableToBrokersData() throws Exception { + BoundRequestBuilder getRequest = _client.prepareGet(_address); + Future<Response> responseFuture = getRequest.addHeader("accept", "application/json").execute(); + Response response = responseFuture.get(); + String responseBody = response.getResponseBody(StandardCharsets.UTF_8); + return OBJECT_MAPPER.readValue(responseBody, RESPONSE_TYPE_REF); + } + + private BrokerData getBrokerData(Map<String, List<BrokerInstance>> responses) { + Set<String> brokers = new HashSet<>(); + Map<String, List<String>> tableToBrokersMap = new HashMap<>(); + Set<String> uniqueTableNames = new HashSet<>(); + + for (Map.Entry<String, List<BrokerInstance>> tableToBrokers : responses.entrySet()) { + List<String> brokersForTable = new ArrayList<>(); + tableToBrokers.getValue().forEach(br -> { + String brokerHostPort = br.getHost() + ":" + br.getPort(); + brokersForTable.add(brokerHostPort); + brokers.add(brokerHostPort); + }); + String tableName = tableToBrokers.getKey(); + tableToBrokersMap.put(tableName, brokersForTable); + + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + uniqueTableNames.add(rawTableName); + } + + // Add table names without suffixes + uniqueTableNames.forEach(tableName -> { + if (!tableToBrokersMap.containsKey(tableName)) { + // 2 possible scenarios: + // 1) Both OFFLINE_SUFFIX and REALTIME_SUFFIX tables present -> use intersection of both the lists + // 2) Either OFFLINE_SUFFIX or REALTIME_SUFFIX (and npt both) raw table present -> use the list as it is + + String offlineTable = tableName + ExternalViewReader.OFFLINE_SUFFIX; + String realtimeTable = tableName + ExternalViewReader.REALTIME_SUFFIX; + if (tableToBrokersMap.containsKey(offlineTable) && tableToBrokersMap.containsKey(realtimeTable)) { + List<String> realtimeBrokers = tableToBrokersMap.get(realtimeTable); + List<String> offlineBrokers = tableToBrokersMap.get(offlineTable); + List<String> tableBrokers = realtimeBrokers + .stream() + .filter(offlineBrokers::contains) + .collect(Collectors.toList()); + tableToBrokersMap.put(tableName, tableBrokers); + } else { + tableToBrokersMap.put(tableName, + tableToBrokersMap.getOrDefault(offlineTable, + tableToBrokersMap.getOrDefault(realtimeTable, new ArrayList<>()))); + } + } + }); + + return new BrokerData(tableToBrokersMap, new ArrayList<>(brokers)); + } + + protected void updateBrokerData() + throws Exception { + Map<String, List<BrokerInstance>> responses = getTableToBrokersData(); + _brokerData = getBrokerData(responses); + } + + public String getBroker(String tableName) { + List<String> brokers = (tableName == null) ? _brokerData._brokers : _brokerData._tableToBrokerMap.get(tableName); + return brokers.get(_random.nextInt(brokers.size())); + } + + public List<String> getBrokers() { + return _brokerData._brokers; + } + + public static class BrokerData { + public Map<String, List<String>> _tableToBrokerMap; Review Comment: Any particular reason for keeping them public? We should be keeping them private and use accessors here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org