This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 01dda9ddfa pinot-java-client controller based broker selector (#8467) 01dda9ddfa is described below commit 01dda9ddfa2dab4ad71f953855b960f1b890cc6c Author: Saurabh Dubey <saurabhd...@gmail.com> AuthorDate: Wed May 4 00:30:05 2022 +0530 pinot-java-client controller based broker selector (#8467) Add support for getting broker data using controller APIs, in the pinot-java-client. This will help in removing ZKClient dependency in the client --- .../java/org/apache/pinot/client/BrokerCache.java | 153 +++++++++++++++++++++ .../pinot/client/BrokerCacheUpdaterPeriodic.java | 83 +++++++++++ .../java/org/apache/pinot/client/BrokerData.java | 41 ++++++ .../org/apache/pinot/client/ConnectionFactory.java | 31 +++++ .../client/ControllerBasedBrokerSelector.java | 57 ++++++++ .../apache/pinot/client/UpdatableBrokerCache.java | 60 ++++++++ .../api/resources/PinotTableInstances.java | 17 +++ .../helix/core/PinotHelixResourceManager.java | 36 +++++ .../helix/core/PinotHelixResourceManagerTest.java | 44 ++++++ .../utils/builder/ControllerRequestURLBuilder.java | 4 + 10 files changed, 526 insertions(+) diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java new file mode 100644 index 0000000000..8f296d3d9c --- /dev/null +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.client; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.BoundRequestBuilder; +import org.asynchttpclient.Dsl; +import org.asynchttpclient.Response; + + +/** + * Maintains table -> list of brokers, supports update + * TODO can we introduce a SSE based controller endpoint to make the update reactive in the client? + */ +public class BrokerCache { + + @JsonIgnoreProperties(ignoreUnknown = true) + private static class BrokerInstance { + private String _host; + private Integer _port; + + public String getHost() { + return _host; + } + + public void setHost(String host) { + _host = host; + } + + public Integer getPort() { + return _port; + } + + public void setPort(Integer port) { + _port = port; + } + } + + private static final TypeReference<Map<String, List<BrokerInstance>>> RESPONSE_TYPE_REF = + new TypeReference<Map<String, List<BrokerInstance>>>() { + }; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final Random _random = new Random(); + private final AsyncHttpClient _client; + private final String _address; + private volatile BrokerData _brokerData; + + public BrokerCache(String scheme, String controllerHost, int controllerPort) { + _client = Dsl.asyncHttpClient(); + ControllerRequestURLBuilder controllerRequestURLBuilder = + ControllerRequestURLBuilder.baseUrl(scheme + "://" + controllerHost + ":" + controllerPort); + _address = controllerRequestURLBuilder.forLiveBrokerTablesGet(); + } + + private Map<String, List<BrokerInstance>> getTableToBrokersData() throws Exception { + BoundRequestBuilder getRequest = _client.prepareGet(_address); + Future<Response> responseFuture = getRequest.addHeader("accept", "application/json").execute(); + Response response = responseFuture.get(); + String responseBody = response.getResponseBody(StandardCharsets.UTF_8); + return OBJECT_MAPPER.readValue(responseBody, RESPONSE_TYPE_REF); + } + + private BrokerData getBrokerData(Map<String, List<BrokerInstance>> responses) { + Set<String> brokers = new HashSet<>(); + Map<String, List<String>> tableToBrokersMap = new HashMap<>(); + Set<String> uniqueTableNames = new HashSet<>(); + + for (Map.Entry<String, List<BrokerInstance>> tableToBrokers : responses.entrySet()) { + List<String> brokersForTable = new ArrayList<>(); + tableToBrokers.getValue().forEach(br -> { + String brokerHostPort = br.getHost() + ":" + br.getPort(); + brokersForTable.add(brokerHostPort); + brokers.add(brokerHostPort); + }); + String tableName = tableToBrokers.getKey(); + tableToBrokersMap.put(tableName, brokersForTable); + + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + uniqueTableNames.add(rawTableName); + } + + // Add table names without suffixes + uniqueTableNames.forEach(tableName -> { + if (!tableToBrokersMap.containsKey(tableName)) { + // 2 possible scenarios: + // 1) Both OFFLINE_SUFFIX and REALTIME_SUFFIX tables present -> use intersection of both the lists + // 2) Either OFFLINE_SUFFIX or REALTIME_SUFFIX (and npt both) raw table present -> use the list as it is + + String offlineTable = tableName + ExternalViewReader.OFFLINE_SUFFIX; + String realtimeTable = tableName + ExternalViewReader.REALTIME_SUFFIX; + if (tableToBrokersMap.containsKey(offlineTable) && tableToBrokersMap.containsKey(realtimeTable)) { + List<String> realtimeBrokers = tableToBrokersMap.get(realtimeTable); + List<String> offlineBrokers = tableToBrokersMap.get(offlineTable); + List<String> tableBrokers = + realtimeBrokers.stream().filter(offlineBrokers::contains).collect(Collectors.toList()); + tableToBrokersMap.put(tableName, tableBrokers); + } else { + tableToBrokersMap.put(tableName, tableToBrokersMap.getOrDefault(offlineTable, + tableToBrokersMap.getOrDefault(realtimeTable, new ArrayList<>()))); + } + } + }); + + return new BrokerData(tableToBrokersMap, new ArrayList<>(brokers)); + } + + protected void updateBrokerData() + throws Exception { + Map<String, List<BrokerInstance>> responses = getTableToBrokersData(); + _brokerData = getBrokerData(responses); + } + + public String getBroker(String tableName) { + List<String> brokers = + (tableName == null) ? _brokerData.getBrokers() : _brokerData.getTableToBrokerMap().get(tableName); + return brokers.get(_random.nextInt(brokers.size())); + } + + public List<String> getBrokers() { + return _brokerData.getBrokers(); + } +} diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java new file mode 100644 index 0000000000..404a5f7b03 --- /dev/null +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.client; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintains broker cache this is updated periodically + */ +public class BrokerCacheUpdaterPeriodic implements UpdatableBrokerCache { + private final BrokerCache _brokerCache; + private final ScheduledExecutorService _scheduledExecutorService; + private final long _brokerUpdateFreqInMillis; + + private static final Logger LOGGER = LoggerFactory.getLogger(BrokerCacheUpdaterPeriodic.class); + + public BrokerCacheUpdaterPeriodic(String scheme, String controllerHost, + int controllerPort, long brokerUpdateFreqInMillis) { + _brokerCache = new BrokerCache(scheme, controllerHost, controllerPort); + _scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + _brokerUpdateFreqInMillis = brokerUpdateFreqInMillis; + } + + public void init() throws Exception { + _brokerCache.updateBrokerData(); + + if (_brokerUpdateFreqInMillis > 0) { + _scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + _brokerCache.updateBrokerData(); + } catch (Exception e) { + LOGGER.error("Broker cache update failed", e); + } + } + }, 0, _brokerUpdateFreqInMillis, TimeUnit.MILLISECONDS); + } + } + + public String getBroker(String tableName) { + return _brokerCache.getBroker(tableName); + } + + @Override + public List<String> getBrokers() { + return _brokerCache.getBrokers(); + } + + @Override + public void triggerBrokerCacheUpdate() throws Exception { + _brokerCache.updateBrokerData(); + } + + public void close() { + try { + _scheduledExecutorService.shutdown(); + } catch (Exception e) { + LOGGER.error("Cannot shutdown Broker Cache update periodic task", e); + } + } +} diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerData.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerData.java new file mode 100644 index 0000000000..ed1d7fea8d --- /dev/null +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerData.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.client; + +import java.util.List; +import java.util.Map; + + +public class BrokerData { + private final Map<String, List<String>> _tableToBrokerMap; + private final List<String> _brokers; + + public Map<String, List<String>> getTableToBrokerMap() { + return _tableToBrokerMap; + } + + public List<String> getBrokers() { + return _brokers; + } + + public BrokerData(Map<String, List<String>> tableToBrokerMap, List<String> brokers) { + _tableToBrokerMap = tableToBrokerMap; + _brokers = brokers; + } +} diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java index a081491555..57dc0bdfec 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java @@ -58,6 +58,37 @@ public class ConnectionFactory { } } + /** + * Creates a connection to Pinot cluster, given its Controller URL + * + * @param scheme controller URL scheme + * @param controllerHost controller host + * @param controllerPort controller port + * @return A connection that connects to brokers as per the given controller + */ + public static Connection fromController(String scheme, String controllerHost, int controllerPort) { + return fromController(scheme, controllerHost, controllerPort, 1000); + } + + /** + * + * @param scheme controller URL scheme + * @param controllerHost controller host + * @param controllerPort controller port + * @param brokerUpdateFreqInMillis frequency of broker data refresh using controller APIs + * @return A connection that connects to brokers as per the given controller + */ + public static Connection fromController(String scheme, String controllerHost, int controllerPort, + long brokerUpdateFreqInMillis) { + try { + return new Connection(new Properties(), + new ControllerBasedBrokerSelector(scheme, controllerHost, controllerPort, brokerUpdateFreqInMillis), + getDefault()); + } catch (Exception e) { + throw new PinotClientException(e); + } + } + /** * Creates a connection to a Pinot cluster, given its Zookeeper URL * diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java new file mode 100644 index 0000000000..2a86d24386 --- /dev/null +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.client; + +import java.util.List; + + +/** + * Maintains broker cache using controller APIs + */ +public class ControllerBasedBrokerSelector implements BrokerSelector { + private final UpdatableBrokerCache _brokerCache; + + public ControllerBasedBrokerSelector(String scheme, String controllerHost, int controllerPort, + long brokerUpdateFreqInMillis) + throws Exception { + _brokerCache = new BrokerCacheUpdaterPeriodic(scheme, controllerHost, controllerPort, brokerUpdateFreqInMillis); + _brokerCache.init(); + } + + @Override + public String selectBroker(String table) { + return _brokerCache.getBroker(table); + } + + @Override + public List<String> getBrokers() { + return _brokerCache.getBrokers(); + } + + @Override + public void close() { + _brokerCache.close(); + } + + public void updateBrokers() + throws Exception { + _brokerCache.triggerBrokerCacheUpdate(); + } +} diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java new file mode 100644 index 0000000000..1e19dff789 --- /dev/null +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.client; + +import java.util.List; + + +/** + * A updatable cache of table name to list of eligible brokers. + * Implementations should implement manual refreshing mechanism + */ +public interface UpdatableBrokerCache { + /** + * Initializes the cache + * @throws Exception + */ + void init() + throws Exception; + + /** + * Method to get one random broker for a given table + * @param tableName + * @return Broker address corresponding to the table + */ + String getBroker(String tableName); + + /** + * Returns all the brokers currently in the cache + * @return List of all avaliable brokers + */ + List<String> getBrokers(); + + /** + * Manually trigger a cache refresh + * @throws Exception + */ + void triggerBrokerCacheUpdate() + throws Exception; + + /** + * Closes the cache and release any resources + */ + void close(); +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java index 45c6db2002..69e0091803 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java @@ -26,6 +26,7 @@ import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import java.util.List; +import java.util.Map; import javax.inject.Inject; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; @@ -140,4 +141,20 @@ public class PinotTableInstances { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.NOT_FOUND); } } + + @GET + @Path("/tables/livebrokers") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "List tables to live brokers mappings", notes = "List tables to live brokers mappings based " + + "on EV") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error") + }) + public Map<String, List<InstanceInfo>> getLiveBrokers() { + try { + return _pinotHelixResourceManager.getTableToLiveBrokersMapping(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.NOT_FOUND); + } + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 2f9a43718f..f32e2e75e9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -44,6 +44,7 @@ import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.ws.rs.BadRequestException; @@ -106,6 +107,7 @@ import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.api.exception.InvalidTableConfigException; import org.apache.pinot.controller.api.exception.TableAlreadyExistsException; +import org.apache.pinot.controller.api.resources.InstanceInfo; import org.apache.pinot.controller.api.resources.StateType; import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; @@ -3378,6 +3380,40 @@ public class PinotHelixResourceManager { return new TableStats(creationTime); } + /** + * Returns map of tableName to list of live brokers + * @return Map of tableName to list of ONLINE brokers serving the table + */ + public Map<String, List<InstanceInfo>> getTableToLiveBrokersMapping() { + ExternalView ev = _helixDataAccessor.getProperty(_keyBuilder.externalView(Helix.BROKER_RESOURCE_INSTANCE)); + if (ev == null) { + throw new IllegalStateException("Failed to find external view for " + Helix.BROKER_RESOURCE_INSTANCE); + } + + // Map of instanceId -> InstanceConfig + Map<String, InstanceConfig> instanceConfigMap = HelixHelper.getInstanceConfigs(_helixZkManager) + .stream().collect(Collectors.toMap(InstanceConfig::getInstanceName, Function.identity())); + + Map<String, List<InstanceInfo>> result = new HashMap<>(); + ZNRecord znRecord = ev.getRecord(); + for (Map.Entry<String, Map<String, String>> tableToBrokersEntry : znRecord.getMapFields().entrySet()) { + String tableName = tableToBrokersEntry.getKey(); + Map<String, String> brokersToState = tableToBrokersEntry.getValue(); + List<InstanceInfo> hosts = new ArrayList<>(); + for (Map.Entry<String, String> brokerEntry : brokersToState.entrySet()) { + if ("ONLINE".equalsIgnoreCase(brokerEntry.getValue()) && instanceConfigMap.containsKey(brokerEntry.getKey())) { + InstanceConfig instanceConfig = instanceConfigMap.get(brokerEntry.getKey()); + hosts.add(new InstanceInfo(instanceConfig.getInstanceName(), instanceConfig.getHostName(), + Integer.parseInt(instanceConfig.getPort()))); + } + } + if (!hosts.isEmpty()) { + result.put(tableName, hosts); + } + } + return result; + } + /** * Return the list of live brokers serving the corresponding table. Based on the * input tableName, there can be 3 cases: diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java index d1a63347a6..996f5b521c 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java @@ -53,6 +53,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.common.utils.helix.LeadControllerUtils; import org.apache.pinot.controller.ControllerTestUtils; +import org.apache.pinot.controller.api.resources.InstanceInfo; import org.apache.pinot.controller.utils.SegmentMetadataMockUtils; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.instance.Instance; @@ -1085,6 +1086,49 @@ public class PinotHelixResourceManagerTest { throw new RuntimeException("Timeout while waiting for segments to be deleted"); } + @Test + public void testGetTableToLiveBrokersMapping() + throws IOException { + Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 2, 0, 0); + PinotResourceManagerResponse response = + ControllerTestUtils.getHelixResourceManager().createBrokerTenant(brokerTenant); + Assert.assertTrue(response.isSuccessful()); + // Create the table + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setNumReplicas(ControllerTestUtils.MIN_NUM_REPLICAS).setBrokerTenant(BROKER_TENANT_NAME) + .setServerTenant(SERVER_TENANT_NAME).build(); + ControllerTestUtils.getHelixResourceManager().addTable(tableConfig); + // Introduce a wait here for the EV is updated with live brokers for a table. + TestUtils.waitForCondition(aVoid -> { + ExternalView externalView = ControllerTestUtils.getHelixResourceManager().getHelixAdmin() + .getResourceExternalView(ControllerTestUtils.getHelixClusterName(), + CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + int onlineBrokersCnt = 0; + Map<String, String> brokerToStateMap = externalView.getStateMap(OFFLINE_TABLE_NAME); + if (brokerToStateMap == null) { + return false; + } + for (Map.Entry<String, String> entry : brokerToStateMap.entrySet()) { + if ("ONLINE".equalsIgnoreCase(entry.getValue())) { + onlineBrokersCnt++; + } + } + return onlineBrokersCnt == 2; + }, TIMEOUT_IN_MS, ""); + + Map<String, List<InstanceInfo>> tableToBrokersMapping = + ControllerTestUtils.getHelixResourceManager().getTableToLiveBrokersMapping(); + + Assert.assertEquals(tableToBrokersMapping.size(), 1); + Assert.assertEquals(tableToBrokersMapping.get(OFFLINE_TABLE_NAME).size(), 2); + + // Delete the table + ControllerTestUtils.getHelixResourceManager().deleteOfflineTable(TABLE_NAME); + ControllerTestUtils.getHelixResourceManager().deleteRealtimeTable(TABLE_NAME); + // Clean up. + untagBrokers(); + } + @Test public void testGetLiveBrokersForTable() throws IOException, TableNotFoundException { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index 2175994813..9f57222e2b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -142,6 +142,10 @@ public class ControllerRequestURLBuilder { return StringUtil.join("/", _baseUrl, "brokers", "tables", "?state=" + state); } + public String forLiveBrokerTablesGet() { + return StringUtil.join("/", _baseUrl, "tables", "livebrokers"); + } + public String forBrokerTableGet(String table, String tableType, String state) { StringBuilder params = new StringBuilder(); if (tableType != null) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org