This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 25aa780  Adding controller APIs to fetch brokers information (#5685)
25aa780 is described below

commit 25aa780814809c3290d278bb2136373865f5c947
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Tue Jul 14 16:25:58 2020 -0700

    Adding controller APIs to fetch brokers information (#5685)
    
    * Adding controller APIs to fetch brokers information
    
    * Address comments
    
    * Address comments
---
 .../pinot/controller/api/resources/Constants.java  |   1 +
 .../api/resources/PinotBrokerRestletResource.java  | 145 +++++++++++++++++
 .../helix/ControllerRequestURLBuilder.java         |  42 +++++
 .../api/PinotBrokerRestletResourceTest.java        | 171 +++++++++++++++++++++
 .../pinot/controller/helix/ControllerTest.java     |  10 ++
 5 files changed, 369 insertions(+)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index 5e463e0..88658b0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -35,6 +35,7 @@ public class Constants {
   public static final String INSTANCE_TAG = "Instance";
   public static final String SCHEMA_TAG = "Schema";
   public static final String TENANT_TAG = "Tenant";
+  public static final String BROKER_TAG = "Broker";
   public static final String SEGMENT_TAG = "Segment";
   public static final String TASK_TAG = "Task";
   public static final String LEAD_CONTROLLER_TAG = "Leader";
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
new file mode 100644
index 0000000..3416769
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.google.common.collect.ImmutableList;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.BROKER_TAG)
+@Path("/")
+public class PinotBrokerRestletResource {
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(PinotBrokerRestletResource.class);
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/brokers")
+  @ApiOperation(value = "List tenants and tables to brokers mappings", notes = 
"List tenants and tables to brokers mappings")
+  public Map<String, Map<String, List<String>>> listBrokersMapping(
+      @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+    Map<String, Map<String, List<String>>> resultMap = new HashMap<>();
+    resultMap.put("tenants", getTenantsToBrokersMapping(state));
+    resultMap.put("tables", getTablesToBrokersMapping(state));
+    return resultMap;
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/brokers/tenants")
+  @ApiOperation(value = "List tenants to brokers mappings", notes = "List 
tenants to brokers mappings")
+  public Map<String, List<String>> getTenantsToBrokersMapping(
+      @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+    Map<String, List<String>> resultMap = new HashMap<>();
+    _pinotHelixResourceManager.getAllBrokerTenantNames().stream()
+        .forEach(tenant -> resultMap.put(tenant, getBrokersForTenant(tenant, 
state)));
+    return resultMap;
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/brokers/tenants/{tenantName}")
+  @ApiOperation(value = "List brokers for a given tenant", notes = "List 
brokers for a given tenant")
+  public List<String> getBrokersForTenant(
+      @ApiParam(value = "Name of the tenant", required = true) 
@PathParam("tenantName") String tenantName,
+      @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+    if 
(!_pinotHelixResourceManager.getAllBrokerTenantNames().contains(tenantName)) {
+      throw new ControllerApplicationException(LOGGER, String.format("Tenant 
'%s' not found.", tenantName),
+          Response.Status.NOT_FOUND);
+    }
+    Set<String> tenantBrokers = new 
HashSet<>(_pinotHelixResourceManager.getAllInstancesForBrokerTenant(tenantName));
+    applyStateChanges(tenantBrokers, state);
+    return ImmutableList.copyOf(tenantBrokers);
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/brokers/tables")
+  @ApiOperation(value = "List tables to brokers mappings", notes = "List 
tables to brokers mappings")
+  public Map<String, List<String>> getTablesToBrokersMapping(
+      @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+    Map<String, List<String>> resultMap = new HashMap<>();
+    _pinotHelixResourceManager.getAllRawTables().stream()
+        .forEach(table -> resultMap.put(table, getBrokersForTable(table, null, 
state)));
+    return resultMap;
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/brokers/tables/{tableName}")
+  @ApiOperation(value = "List brokers for a given table", notes = "List 
brokers for a given table")
+  public List<String> getBrokersForTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
+      @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+    try {
+      List<String> tableNamesWithType = _pinotHelixResourceManager
+          .getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));
+      if (tableNamesWithType.isEmpty()) {
+        throw new ControllerApplicationException(LOGGER, String.format("Table 
'%s' not found.", tableName),
+            Response.Status.NOT_FOUND);
+      }
+      Set<String> tableBrokers =
+          new 
HashSet<>(_pinotHelixResourceManager.getBrokerInstancesFor(tableNamesWithType.get(0)));
+      applyStateChanges(tableBrokers, state);
+      return ImmutableList.copyOf(tableBrokers);
+    } catch (TableNotFoundException e) {
+      throw new ControllerApplicationException(LOGGER, String.format("Table 
'%s' not found.", tableName),
+          Response.Status.NOT_FOUND);
+    } catch (IllegalArgumentException e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.FORBIDDEN);
+    }
+  }
+
+  private void applyStateChanges(Set<String> brokers, String state) {
+    if (state == null) {
+      return;
+    }
+    switch (state) {
+      case CommonConstants.Helix.StateModel.BrokerResourceStateModel.ONLINE:
+        brokers.retainAll(_pinotHelixResourceManager.getOnlineInstanceList());
+        break;
+      case CommonConstants.Helix.StateModel.BrokerResourceStateModel.OFFLINE:
+        brokers.removeAll(_pinotHelixResourceManager.getOnlineInstanceList());
+        break;
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
index 802d085..069b91b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -90,6 +90,48 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", _baseUrl, "tenants", tenantName, 
"?type=server");
   }
 
+  public String forBrokersGet(String state) {
+    if (state == null) {
+      return StringUtil.join("/", _baseUrl, "brokers");
+    }
+    return StringUtil.join("/", _baseUrl, "brokers", "?state=" + state);
+  }
+
+  public String forBrokerTenantsGet(String state) {
+    if (state == null) {
+      return StringUtil.join("/", _baseUrl, "brokers", "tenants");
+    }
+    return StringUtil.join("/", _baseUrl, "brokers", "tenants", "?state=" + 
state);
+  }
+
+  public String forBrokerTenantGet(String tenant, String state) {
+    if (state == null) {
+      return StringUtil.join("/", _baseUrl, "brokers", "tenants", tenant);
+    }
+    return StringUtil.join("/", _baseUrl, "brokers", "tenants", tenant, 
"?state=" + state);
+  }
+
+  public String forBrokerTablesGet(String state) {
+    if (state == null) {
+      return StringUtil.join("/", _baseUrl, "brokers", "tables");
+    }
+    return StringUtil.join("/", _baseUrl, "brokers", "tables", "?state=" + 
state);
+  }
+
+  public String forBrokerTableGet(String table, String tableType, String 
state) {
+    StringBuilder params = new StringBuilder();
+    if (tableType != null) {
+      params.append("?type=" + tableType);
+    }
+    if (state != null) {
+      if (params.length() > 0) {
+        params.append("&");
+      }
+      params.append("?state=" + state);
+    }
+    return StringUtil.join("/", _baseUrl, "brokers", "tables", table, 
params.toString());
+  }
+
   public String forTableCreate() {
     return StringUtil.join("/", _baseUrl, "tables");
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotBrokerRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotBrokerRestletResourceTest.java
new file mode 100644
index 0000000..9101f1e
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotBrokerRestletResourceTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class PinotBrokerRestletResourceTest extends ControllerTest {
+  private static final String TABLE_NAME_1 = "testTable1";
+  private static final String TABLE_NAME_2 = "testTable2";
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    startZk();
+    startController();
+    addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_OFFLINE").size(),
+        1);
+  }
+
+  public void testGetBrokersHelper(String state, int onlineServers, int 
offlineServers)
+      throws Exception {
+    List<String> expectedBrokers = new ArrayList<>();
+    if (state == null) {
+      for (int i = 0; i < onlineServers + offlineServers; i++) {
+        expectedBrokers.add("Broker_localhost_" + i);
+      }
+    } else {
+      switch (state) {
+        case "OFFLINE":
+          for (int i = onlineServers; i < onlineServers + offlineServers; i++) 
{
+            expectedBrokers.add("Broker_localhost_" + i);
+          }
+          break;
+        default:
+          for (int i = 0; i < onlineServers; i++) {
+            expectedBrokers.add("Broker_localhost_" + i);
+          }
+          break;
+      }
+    }
+    Map<String, Map<String, List<String>>> allMap =
+        
JsonUtils.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokersGet(state)),
 Map.class);
+
+    for (String expectedBroker : expectedBrokers) {
+      
Assert.assertTrue(allMap.get("tenants").get("DefaultTenant").contains(expectedBroker));
+      
Assert.assertTrue(allMap.get("tables").get("testTable1").contains(expectedBroker));
+      
Assert.assertTrue(allMap.get("tables").get("testTable2").contains(expectedBroker));
+    }
+
+    Map<String, List<String>> tenantsMap =
+        
JsonUtils.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTenantsGet(state)),
 Map.class);
+    for (String expectedBroker : expectedBrokers) {
+      
Assert.assertTrue(tenantsMap.get("DefaultTenant").contains(expectedBroker));
+    }
+
+    List<String> tenantBrokers = JsonUtils
+        
.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTenantGet("DefaultTenant",
 state)),
+            List.class);
+    for (String expectedBroker : expectedBrokers) {
+      Assert.assertTrue(tenantBrokers.contains(expectedBroker));
+    }
+
+    try {
+      
sendGetRequest(_controllerRequestURLBuilder.forBrokerTenantGet("nonExistTenant",
 state));
+      Assert.fail("Shouldn't reach here");
+    } catch (Exception e) {
+    }
+
+    Map<String, List<String>> tablesMap =
+        
JsonUtils.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTablesGet(state)),
 Map.class);
+    for (String expectedBroker : expectedBrokers) {
+      Assert.assertTrue(tablesMap.get("testTable1").contains(expectedBroker));
+      Assert.assertTrue(tablesMap.get("testTable2").contains(expectedBroker));
+    }
+
+    List<String> tableBrokers = JsonUtils
+        
.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("testTable1",
 "OFFLINE", state)),
+            List.class);
+    for (String expectedBroker : expectedBrokers) {
+      Assert.assertTrue(tableBrokers.contains(expectedBroker));
+    }
+    tableBrokers = JsonUtils
+        
.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("testTable1",
 null, state)),
+            List.class);
+    for (String expectedBroker : expectedBrokers) {
+      Assert.assertTrue(tableBrokers.contains(expectedBroker));
+    }
+    tableBrokers = JsonUtils
+        
.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("testTable2",
 "OFFLINE", state)),
+            List.class);
+    for (String expectedBroker : expectedBrokers) {
+      Assert.assertTrue(tableBrokers.contains(expectedBroker));
+    }
+    tableBrokers = JsonUtils
+        
.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("testTable2",
 null, state)),
+            List.class);
+    for (String expectedBroker : expectedBrokers) {
+      Assert.assertTrue(tableBrokers.contains(expectedBroker));
+    }
+    try {
+      
sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("nonExistTable", 
null, state));
+      Assert.fail("Shouldn't reach here");
+    } catch (Exception e) {
+    }
+  }
+
+  @Test
+  public void testGetBrokers()
+      throws Exception {
+    addFakeBrokerInstancesToAutoJoinHelixCluster(10, true);
+    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_BROKER").size(),
+        10);
+
+    // Adding table
+    _helixResourceManager
+        .addTable(new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_1).setNumReplicas(1).build());
+    _helixResourceManager
+        .addTable(new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_2).setNumReplicas(1).build());
+
+    // Wait for the table addition
+    while (!_helixResourceManager.hasOfflineTable(TABLE_NAME_1) && 
!_helixResourceManager
+        .hasOfflineTable(TABLE_NAME_2)) {
+      Thread.sleep(100);
+    }
+
+    testGetBrokersHelper(null, 10, 0);
+    testGetBrokersHelper("ONLINE", 10, 0);
+    testGetBrokersHelper("OFFLINE", 10, 0);
+    for (int i = 9; i >= 0; i--) {
+      stopFakeInstance(BROKER_INSTANCE_ID_PREFIX + i);
+      testGetBrokersHelper(null, i, 10 - i);
+      testGetBrokersHelper("ONLINE", i, 10 - i);
+      testGetBrokersHelper("OFFLINE", i, 10 - i);
+    }
+  }
+
+  @AfterClass
+  public void tearDown() {
+    stopFakeInstances();
+    stopController();
+    stopZk();
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 76c75fa..0513566 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -396,6 +396,16 @@ public abstract class ControllerTest {
     _fakeInstanceHelixManagers.clear();
   }
 
+  protected void stopFakeInstance(String instanceId) {
+    for (HelixManager helixManager : _fakeInstanceHelixManagers) {
+      if (helixManager.getInstanceName().equalsIgnoreCase(instanceId)) {
+        helixManager.disconnect();
+        _fakeInstanceHelixManagers.remove(helixManager);
+        return;
+      }
+    }
+  }
+
   protected Schema createDummySchema(String tableName) {
     Schema schema = new Schema();
     schema.setSchemaName(tableName);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to