This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 01dda9ddfa pinot-java-client controller based broker selector (#8467)
01dda9ddfa is described below

commit 01dda9ddfa2dab4ad71f953855b960f1b890cc6c
Author: Saurabh Dubey <saurabhd...@gmail.com>
AuthorDate: Wed May 4 00:30:05 2022 +0530

    pinot-java-client controller based broker selector (#8467)
    
    Add support for getting broker data using controller APIs, in the 
pinot-java-client. This will help in removing ZKClient dependency in the client
---
 .../java/org/apache/pinot/client/BrokerCache.java  | 153 +++++++++++++++++++++
 .../pinot/client/BrokerCacheUpdaterPeriodic.java   |  83 +++++++++++
 .../java/org/apache/pinot/client/BrokerData.java   |  41 ++++++
 .../org/apache/pinot/client/ConnectionFactory.java |  31 +++++
 .../client/ControllerBasedBrokerSelector.java      |  57 ++++++++
 .../apache/pinot/client/UpdatableBrokerCache.java  |  60 ++++++++
 .../api/resources/PinotTableInstances.java         |  17 +++
 .../helix/core/PinotHelixResourceManager.java      |  36 +++++
 .../helix/core/PinotHelixResourceManagerTest.java  |  44 ++++++
 .../utils/builder/ControllerRequestURLBuilder.java |   4 +
 10 files changed, 526 insertions(+)

diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
new file mode 100644
index 0000000000..8f296d3d9c
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.Dsl;
+import org.asynchttpclient.Response;
+
+
+/**
+ * Maintains table -> list of brokers, supports update
+ * TODO can we introduce a SSE based controller endpoint to make the update 
reactive in the client?
+ */
+public class BrokerCache {
+
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  private static class BrokerInstance {
+    private String _host;
+    private Integer _port;
+
+    public String getHost() {
+      return _host;
+    }
+
+    public void setHost(String host) {
+      _host = host;
+    }
+
+    public Integer getPort() {
+      return _port;
+    }
+
+    public void setPort(Integer port) {
+      _port = port;
+    }
+  }
+
+  private static final TypeReference<Map<String, List<BrokerInstance>>> 
RESPONSE_TYPE_REF =
+      new TypeReference<Map<String, List<BrokerInstance>>>() {
+      };
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private final Random _random = new Random();
+  private final AsyncHttpClient _client;
+  private final String _address;
+  private volatile BrokerData _brokerData;
+
+  public BrokerCache(String scheme, String controllerHost, int controllerPort) 
{
+    _client = Dsl.asyncHttpClient();
+    ControllerRequestURLBuilder controllerRequestURLBuilder =
+        ControllerRequestURLBuilder.baseUrl(scheme + "://" + controllerHost + 
":" + controllerPort);
+    _address = controllerRequestURLBuilder.forLiveBrokerTablesGet();
+  }
+
+  private Map<String, List<BrokerInstance>> getTableToBrokersData() throws 
Exception {
+    BoundRequestBuilder getRequest = _client.prepareGet(_address);
+    Future<Response> responseFuture = getRequest.addHeader("accept", 
"application/json").execute();
+    Response response = responseFuture.get();
+    String responseBody = response.getResponseBody(StandardCharsets.UTF_8);
+    return OBJECT_MAPPER.readValue(responseBody, RESPONSE_TYPE_REF);
+  }
+
+  private BrokerData getBrokerData(Map<String, List<BrokerInstance>> 
responses) {
+    Set<String> brokers = new HashSet<>();
+    Map<String, List<String>> tableToBrokersMap = new HashMap<>();
+    Set<String> uniqueTableNames = new HashSet<>();
+
+    for (Map.Entry<String, List<BrokerInstance>> tableToBrokers : 
responses.entrySet()) {
+      List<String> brokersForTable = new ArrayList<>();
+      tableToBrokers.getValue().forEach(br -> {
+        String brokerHostPort = br.getHost() + ":" + br.getPort();
+        brokersForTable.add(brokerHostPort);
+        brokers.add(brokerHostPort);
+      });
+      String tableName = tableToBrokers.getKey();
+      tableToBrokersMap.put(tableName, brokersForTable);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      uniqueTableNames.add(rawTableName);
+    }
+
+    // Add table names without suffixes
+    uniqueTableNames.forEach(tableName -> {
+      if (!tableToBrokersMap.containsKey(tableName)) {
+        // 2 possible scenarios:
+        // 1) Both OFFLINE_SUFFIX and REALTIME_SUFFIX tables present -> use 
intersection of both the lists
+        // 2) Either OFFLINE_SUFFIX or REALTIME_SUFFIX (and npt both) raw 
table present -> use the list as it is
+
+        String offlineTable = tableName + ExternalViewReader.OFFLINE_SUFFIX;
+        String realtimeTable = tableName + ExternalViewReader.REALTIME_SUFFIX;
+        if (tableToBrokersMap.containsKey(offlineTable) && 
tableToBrokersMap.containsKey(realtimeTable)) {
+          List<String> realtimeBrokers = tableToBrokersMap.get(realtimeTable);
+          List<String> offlineBrokers = tableToBrokersMap.get(offlineTable);
+          List<String> tableBrokers =
+              
realtimeBrokers.stream().filter(offlineBrokers::contains).collect(Collectors.toList());
+          tableToBrokersMap.put(tableName, tableBrokers);
+        } else {
+          tableToBrokersMap.put(tableName, 
tableToBrokersMap.getOrDefault(offlineTable,
+              tableToBrokersMap.getOrDefault(realtimeTable, new 
ArrayList<>())));
+        }
+      }
+    });
+
+    return new BrokerData(tableToBrokersMap, new ArrayList<>(brokers));
+  }
+
+  protected void updateBrokerData()
+      throws Exception {
+    Map<String, List<BrokerInstance>> responses = getTableToBrokersData();
+    _brokerData = getBrokerData(responses);
+  }
+
+  public String getBroker(String tableName) {
+    List<String> brokers =
+        (tableName == null) ? _brokerData.getBrokers() : 
_brokerData.getTableToBrokerMap().get(tableName);
+    return brokers.get(_random.nextInt(brokers.size()));
+  }
+
+  public List<String> getBrokers() {
+    return _brokerData.getBrokers();
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java
new file mode 100644
index 0000000000..404a5f7b03
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains broker cache this is updated periodically
+ */
+public class BrokerCacheUpdaterPeriodic implements UpdatableBrokerCache {
+  private final BrokerCache _brokerCache;
+  private final ScheduledExecutorService _scheduledExecutorService;
+  private final long _brokerUpdateFreqInMillis;
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerCacheUpdaterPeriodic.class);
+
+  public BrokerCacheUpdaterPeriodic(String scheme, String controllerHost,
+      int controllerPort, long brokerUpdateFreqInMillis) {
+    _brokerCache = new BrokerCache(scheme, controllerHost, controllerPort);
+    _scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+    _brokerUpdateFreqInMillis = brokerUpdateFreqInMillis;
+  }
+
+  public void init() throws Exception {
+    _brokerCache.updateBrokerData();
+
+    if (_brokerUpdateFreqInMillis > 0) {
+      _scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            _brokerCache.updateBrokerData();
+          } catch (Exception e) {
+            LOGGER.error("Broker cache update failed", e);
+          }
+        }
+      }, 0, _brokerUpdateFreqInMillis, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  public String getBroker(String tableName) {
+    return _brokerCache.getBroker(tableName);
+  }
+
+  @Override
+  public List<String> getBrokers() {
+    return _brokerCache.getBrokers();
+  }
+
+  @Override
+  public void triggerBrokerCacheUpdate() throws Exception {
+    _brokerCache.updateBrokerData();
+  }
+
+  public void close() {
+    try {
+      _scheduledExecutorService.shutdown();
+    } catch (Exception e) {
+      LOGGER.error("Cannot shutdown Broker Cache update periodic task", e);
+    }
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerData.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerData.java
new file mode 100644
index 0000000000..ed1d7fea8d
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerData.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import java.util.List;
+import java.util.Map;
+
+
+public class BrokerData {
+  private final Map<String, List<String>> _tableToBrokerMap;
+  private final List<String> _brokers;
+
+  public Map<String, List<String>> getTableToBrokerMap() {
+    return _tableToBrokerMap;
+  }
+
+  public List<String> getBrokers() {
+    return _brokers;
+  }
+
+  public BrokerData(Map<String, List<String>> tableToBrokerMap, List<String> 
brokers) {
+    _tableToBrokerMap = tableToBrokerMap;
+    _brokers = brokers;
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
index a081491555..57dc0bdfec 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
@@ -58,6 +58,37 @@ public class ConnectionFactory {
     }
   }
 
+  /**
+   * Creates a connection to Pinot cluster, given its Controller URL
+   *
+   * @param scheme controller URL scheme
+   * @param controllerHost controller host
+   * @param controllerPort controller port
+   * @return A connection that connects to brokers as per the given controller
+   */
+  public static Connection fromController(String scheme, String 
controllerHost, int controllerPort) {
+    return fromController(scheme, controllerHost, controllerPort, 1000);
+  }
+
+  /**
+   *
+   * @param scheme controller URL scheme
+   * @param controllerHost controller host
+   * @param controllerPort controller port
+   * @param brokerUpdateFreqInMillis frequency of broker data refresh using 
controller APIs
+   * @return A connection that connects to brokers as per the given controller
+   */
+  public static Connection fromController(String scheme, String 
controllerHost, int controllerPort,
+      long brokerUpdateFreqInMillis) {
+    try {
+      return new Connection(new Properties(),
+          new ControllerBasedBrokerSelector(scheme, controllerHost, 
controllerPort, brokerUpdateFreqInMillis),
+          getDefault());
+    } catch (Exception e) {
+      throw new PinotClientException(e);
+    }
+  }
+
   /**
    * Creates a connection to a Pinot cluster, given its Zookeeper URL
    *
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java
new file mode 100644
index 0000000000..2a86d24386
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.client;
+
+import java.util.List;
+
+
+/**
+ * Maintains broker cache using controller APIs
+ */
+public class ControllerBasedBrokerSelector implements BrokerSelector {
+  private final UpdatableBrokerCache _brokerCache;
+
+  public ControllerBasedBrokerSelector(String scheme, String controllerHost, 
int controllerPort,
+      long brokerUpdateFreqInMillis)
+      throws Exception {
+    _brokerCache = new BrokerCacheUpdaterPeriodic(scheme, controllerHost, 
controllerPort, brokerUpdateFreqInMillis);
+    _brokerCache.init();
+  }
+
+  @Override
+  public String selectBroker(String table) {
+    return _brokerCache.getBroker(table);
+  }
+
+  @Override
+  public List<String> getBrokers() {
+    return _brokerCache.getBrokers();
+  }
+
+  @Override
+  public void close() {
+    _brokerCache.close();
+  }
+
+  public void updateBrokers()
+      throws Exception {
+    _brokerCache.triggerBrokerCacheUpdate();
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java
new file mode 100644
index 0000000000..1e19dff789
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import java.util.List;
+
+
+/**
+ * A updatable cache of table name to list of eligible brokers.
+ * Implementations should implement manual refreshing mechanism
+ */
+public interface UpdatableBrokerCache {
+  /**
+   * Initializes the cache
+   * @throws Exception
+   */
+  void init()
+      throws Exception;
+
+  /**
+   * Method to get one random broker for a given table
+   * @param tableName
+   * @return Broker address corresponding to the table
+   */
+  String getBroker(String tableName);
+
+  /**
+   * Returns all the brokers currently in the cache
+   * @return List of all avaliable brokers
+   */
+  List<String> getBrokers();
+
+  /**
+   * Manually trigger a cache refresh
+   * @throws Exception
+   */
+  void triggerBrokerCacheUpdate()
+      throws Exception;
+
+  /**
+   * Closes the cache and release any resources
+   */
+  void close();
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java
index 45c6db2002..69e0091803 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import java.util.List;
+import java.util.Map;
 import javax.inject.Inject;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -140,4 +141,20 @@ public class PinotTableInstances {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.NOT_FOUND);
     }
   }
+
+  @GET
+  @Path("/tables/livebrokers")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "List tables to live brokers mappings", notes = "List 
tables to live brokers mappings based "
+      + "on EV")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error")
+  })
+  public Map<String, List<InstanceInfo>> getLiveBrokers() {
+    try {
+      return _pinotHelixResourceManager.getTableToLiveBrokersMapping();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.NOT_FOUND);
+    }
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 2f9a43718f..f32e2e75e9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -44,6 +44,7 @@ import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.ws.rs.BadRequestException;
@@ -106,6 +107,7 @@ import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
 import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
 import org.apache.pinot.controller.api.resources.StateType;
 import 
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
@@ -3378,6 +3380,40 @@ public class PinotHelixResourceManager {
     return new TableStats(creationTime);
   }
 
+  /**
+   * Returns map of tableName to list of live brokers
+   * @return Map of tableName to list of ONLINE brokers serving the table
+   */
+  public Map<String, List<InstanceInfo>> getTableToLiveBrokersMapping() {
+    ExternalView ev = 
_helixDataAccessor.getProperty(_keyBuilder.externalView(Helix.BROKER_RESOURCE_INSTANCE));
+    if (ev == null) {
+      throw new IllegalStateException("Failed to find external view for " + 
Helix.BROKER_RESOURCE_INSTANCE);
+    }
+
+    // Map of instanceId -> InstanceConfig
+    Map<String, InstanceConfig> instanceConfigMap = 
HelixHelper.getInstanceConfigs(_helixZkManager)
+        .stream().collect(Collectors.toMap(InstanceConfig::getInstanceName, 
Function.identity()));
+
+    Map<String, List<InstanceInfo>> result = new HashMap<>();
+    ZNRecord znRecord = ev.getRecord();
+    for (Map.Entry<String, Map<String, String>> tableToBrokersEntry : 
znRecord.getMapFields().entrySet()) {
+      String tableName = tableToBrokersEntry.getKey();
+      Map<String, String> brokersToState = tableToBrokersEntry.getValue();
+      List<InstanceInfo> hosts = new ArrayList<>();
+      for (Map.Entry<String, String> brokerEntry : brokersToState.entrySet()) {
+        if ("ONLINE".equalsIgnoreCase(brokerEntry.getValue()) && 
instanceConfigMap.containsKey(brokerEntry.getKey())) {
+          InstanceConfig instanceConfig = 
instanceConfigMap.get(brokerEntry.getKey());
+          hosts.add(new InstanceInfo(instanceConfig.getInstanceName(), 
instanceConfig.getHostName(),
+              Integer.parseInt(instanceConfig.getPort())));
+        }
+      }
+      if (!hosts.isEmpty()) {
+        result.put(tableName, hosts);
+      }
+    }
+    return result;
+  }
+
   /**
    * Return the list of live brokers serving the corresponding table. Based on 
the
    * input tableName, there can be 3 cases:
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index d1a63347a6..996f5b521c 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -53,6 +53,7 @@ import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.helix.LeadControllerUtils;
 import org.apache.pinot.controller.ControllerTestUtils;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.instance.Instance;
@@ -1085,6 +1086,49 @@ public class PinotHelixResourceManagerTest {
     throw new RuntimeException("Timeout while waiting for segments to be 
deleted");
   }
 
+  @Test
+  public void testGetTableToLiveBrokersMapping()
+      throws IOException {
+    Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 2, 
0, 0);
+    PinotResourceManagerResponse response =
+        
ControllerTestUtils.getHelixResourceManager().createBrokerTenant(brokerTenant);
+    Assert.assertTrue(response.isSuccessful());
+    // Create the table
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        
.setNumReplicas(ControllerTestUtils.MIN_NUM_REPLICAS).setBrokerTenant(BROKER_TENANT_NAME)
+        .setServerTenant(SERVER_TENANT_NAME).build();
+    ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
+    // Introduce a wait here for the EV is updated with live brokers for a 
table.
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView externalView = 
ControllerTestUtils.getHelixResourceManager().getHelixAdmin()
+          .getResourceExternalView(ControllerTestUtils.getHelixClusterName(),
+              CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+      int onlineBrokersCnt = 0;
+      Map<String, String> brokerToStateMap = 
externalView.getStateMap(OFFLINE_TABLE_NAME);
+      if (brokerToStateMap == null) {
+        return false;
+      }
+      for (Map.Entry<String, String> entry : brokerToStateMap.entrySet()) {
+        if ("ONLINE".equalsIgnoreCase(entry.getValue())) {
+          onlineBrokersCnt++;
+        }
+      }
+      return onlineBrokersCnt == 2;
+    }, TIMEOUT_IN_MS, "");
+
+    Map<String, List<InstanceInfo>> tableToBrokersMapping =
+        
ControllerTestUtils.getHelixResourceManager().getTableToLiveBrokersMapping();
+
+    Assert.assertEquals(tableToBrokersMapping.size(), 1);
+    Assert.assertEquals(tableToBrokersMapping.get(OFFLINE_TABLE_NAME).size(), 
2);
+
+    // Delete the table
+    
ControllerTestUtils.getHelixResourceManager().deleteOfflineTable(TABLE_NAME);
+    
ControllerTestUtils.getHelixResourceManager().deleteRealtimeTable(TABLE_NAME);
+    // Clean up.
+    untagBrokers();
+  }
+
   @Test
   public void testGetLiveBrokersForTable()
       throws IOException, TableNotFoundException {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 2175994813..9f57222e2b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -142,6 +142,10 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", _baseUrl, "brokers", "tables", "?state=" + 
state);
   }
 
+  public String forLiveBrokerTablesGet() {
+    return StringUtil.join("/", _baseUrl, "tables", "livebrokers");
+  }
+
   public String forBrokerTableGet(String table, String tableType, String 
state) {
     StringBuilder params = new StringBuilder();
     if (tableType != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to