This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 25aa780 Adding controller APIs to fetch brokers information (#5685) 25aa780 is described below commit 25aa780814809c3290d278bb2136373865f5c947 Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Tue Jul 14 16:25:58 2020 -0700 Adding controller APIs to fetch brokers information (#5685) * Adding controller APIs to fetch brokers information * Address comments * Address comments --- .../pinot/controller/api/resources/Constants.java | 1 + .../api/resources/PinotBrokerRestletResource.java | 145 +++++++++++++++++ .../helix/ControllerRequestURLBuilder.java | 42 +++++ .../api/PinotBrokerRestletResourceTest.java | 171 +++++++++++++++++++++ .../pinot/controller/helix/ControllerTest.java | 10 ++ 5 files changed, 369 insertions(+) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java index 5e463e0..88658b0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java @@ -35,6 +35,7 @@ public class Constants { public static final String INSTANCE_TAG = "Instance"; public static final String SCHEMA_TAG = "Schema"; public static final String TENANT_TAG = "Tenant"; + public static final String BROKER_TAG = "Broker"; public static final String SEGMENT_TAG = "Segment"; public static final String TASK_TAG = "Task"; public static final String LEAD_CONTROLLER_TAG = "Leader"; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java new file mode 100644 index 0000000..3416769 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.google.common.collect.ImmutableList; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.common.exception.TableNotFoundException; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@Api(tags = Constants.BROKER_TAG) +@Path("/") +public class PinotBrokerRestletResource { + public static final Logger LOGGER = LoggerFactory.getLogger(PinotBrokerRestletResource.class); + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/brokers") + @ApiOperation(value = "List tenants and tables to brokers mappings", notes = "List tenants and tables to brokers mappings") + public Map<String, Map<String, List<String>>> listBrokersMapping( + @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) { + Map<String, Map<String, List<String>>> resultMap = new HashMap<>(); + resultMap.put("tenants", getTenantsToBrokersMapping(state)); + resultMap.put("tables", getTablesToBrokersMapping(state)); + return resultMap; + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/brokers/tenants") + @ApiOperation(value = "List tenants to brokers mappings", notes = "List tenants to brokers mappings") + public Map<String, List<String>> getTenantsToBrokersMapping( + @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) { + Map<String, List<String>> resultMap = new HashMap<>(); + _pinotHelixResourceManager.getAllBrokerTenantNames().stream() + .forEach(tenant -> resultMap.put(tenant, getBrokersForTenant(tenant, state))); + return resultMap; + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/brokers/tenants/{tenantName}") + @ApiOperation(value = "List brokers for a given tenant", notes = "List brokers for a given tenant") + public List<String> getBrokersForTenant( + @ApiParam(value = "Name of the tenant", required = true) @PathParam("tenantName") String tenantName, + @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) { + if (!_pinotHelixResourceManager.getAllBrokerTenantNames().contains(tenantName)) { + throw new ControllerApplicationException(LOGGER, String.format("Tenant '%s' not found.", tenantName), + Response.Status.NOT_FOUND); + } + Set<String> tenantBrokers = new HashSet<>(_pinotHelixResourceManager.getAllInstancesForBrokerTenant(tenantName)); + applyStateChanges(tenantBrokers, state); + return ImmutableList.copyOf(tenantBrokers); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/brokers/tables") + @ApiOperation(value = "List tables to brokers mappings", notes = "List tables to brokers mappings") + public Map<String, List<String>> getTablesToBrokersMapping( + @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) { + Map<String, List<String>> resultMap = new HashMap<>(); + _pinotHelixResourceManager.getAllRawTables().stream() + .forEach(table -> resultMap.put(table, getBrokersForTable(table, null, state))); + return resultMap; + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/brokers/tables/{tableName}") + @ApiOperation(value = "List brokers for a given table", notes = "List brokers for a given table") + public List<String> getBrokersForTable( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, + @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) { + try { + List<String> tableNamesWithType = _pinotHelixResourceManager + .getExistingTableNamesWithType(tableName, Constants.validateTableType(tableTypeStr)); + if (tableNamesWithType.isEmpty()) { + throw new ControllerApplicationException(LOGGER, String.format("Table '%s' not found.", tableName), + Response.Status.NOT_FOUND); + } + Set<String> tableBrokers = + new HashSet<>(_pinotHelixResourceManager.getBrokerInstancesFor(tableNamesWithType.get(0))); + applyStateChanges(tableBrokers, state); + return ImmutableList.copyOf(tableBrokers); + } catch (TableNotFoundException e) { + throw new ControllerApplicationException(LOGGER, String.format("Table '%s' not found.", tableName), + Response.Status.NOT_FOUND); + } catch (IllegalArgumentException e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.FORBIDDEN); + } + } + + private void applyStateChanges(Set<String> brokers, String state) { + if (state == null) { + return; + } + switch (state) { + case CommonConstants.Helix.StateModel.BrokerResourceStateModel.ONLINE: + brokers.retainAll(_pinotHelixResourceManager.getOnlineInstanceList()); + break; + case CommonConstants.Helix.StateModel.BrokerResourceStateModel.OFFLINE: + brokers.removeAll(_pinotHelixResourceManager.getOnlineInstanceList()); + break; + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java index 802d085..069b91b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java @@ -90,6 +90,48 @@ public class ControllerRequestURLBuilder { return StringUtil.join("/", _baseUrl, "tenants", tenantName, "?type=server"); } + public String forBrokersGet(String state) { + if (state == null) { + return StringUtil.join("/", _baseUrl, "brokers"); + } + return StringUtil.join("/", _baseUrl, "brokers", "?state=" + state); + } + + public String forBrokerTenantsGet(String state) { + if (state == null) { + return StringUtil.join("/", _baseUrl, "brokers", "tenants"); + } + return StringUtil.join("/", _baseUrl, "brokers", "tenants", "?state=" + state); + } + + public String forBrokerTenantGet(String tenant, String state) { + if (state == null) { + return StringUtil.join("/", _baseUrl, "brokers", "tenants", tenant); + } + return StringUtil.join("/", _baseUrl, "brokers", "tenants", tenant, "?state=" + state); + } + + public String forBrokerTablesGet(String state) { + if (state == null) { + return StringUtil.join("/", _baseUrl, "brokers", "tables"); + } + return StringUtil.join("/", _baseUrl, "brokers", "tables", "?state=" + state); + } + + public String forBrokerTableGet(String table, String tableType, String state) { + StringBuilder params = new StringBuilder(); + if (tableType != null) { + params.append("?type=" + tableType); + } + if (state != null) { + if (params.length() > 0) { + params.append("&"); + } + params.append("?state=" + state); + } + return StringUtil.join("/", _baseUrl, "brokers", "tables", table, params.toString()); + } + public String forTableCreate() { return StringUtil.join("/", _baseUrl, "tables"); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotBrokerRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotBrokerRestletResourceTest.java new file mode 100644 index 0000000..9101f1e --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotBrokerRestletResourceTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class PinotBrokerRestletResourceTest extends ControllerTest { + private static final String TABLE_NAME_1 = "testTable1"; + private static final String TABLE_NAME_2 = "testTable2"; + + @BeforeClass + public void setUp() + throws Exception { + startZk(); + startController(); + addFakeServerInstancesToAutoJoinHelixCluster(1, true); + Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_OFFLINE").size(), + 1); + } + + public void testGetBrokersHelper(String state, int onlineServers, int offlineServers) + throws Exception { + List<String> expectedBrokers = new ArrayList<>(); + if (state == null) { + for (int i = 0; i < onlineServers + offlineServers; i++) { + expectedBrokers.add("Broker_localhost_" + i); + } + } else { + switch (state) { + case "OFFLINE": + for (int i = onlineServers; i < onlineServers + offlineServers; i++) { + expectedBrokers.add("Broker_localhost_" + i); + } + break; + default: + for (int i = 0; i < onlineServers; i++) { + expectedBrokers.add("Broker_localhost_" + i); + } + break; + } + } + Map<String, Map<String, List<String>>> allMap = + JsonUtils.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokersGet(state)), Map.class); + + for (String expectedBroker : expectedBrokers) { + Assert.assertTrue(allMap.get("tenants").get("DefaultTenant").contains(expectedBroker)); + Assert.assertTrue(allMap.get("tables").get("testTable1").contains(expectedBroker)); + Assert.assertTrue(allMap.get("tables").get("testTable2").contains(expectedBroker)); + } + + Map<String, List<String>> tenantsMap = + JsonUtils.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTenantsGet(state)), Map.class); + for (String expectedBroker : expectedBrokers) { + Assert.assertTrue(tenantsMap.get("DefaultTenant").contains(expectedBroker)); + } + + List<String> tenantBrokers = JsonUtils + .stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTenantGet("DefaultTenant", state)), + List.class); + for (String expectedBroker : expectedBrokers) { + Assert.assertTrue(tenantBrokers.contains(expectedBroker)); + } + + try { + sendGetRequest(_controllerRequestURLBuilder.forBrokerTenantGet("nonExistTenant", state)); + Assert.fail("Shouldn't reach here"); + } catch (Exception e) { + } + + Map<String, List<String>> tablesMap = + JsonUtils.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTablesGet(state)), Map.class); + for (String expectedBroker : expectedBrokers) { + Assert.assertTrue(tablesMap.get("testTable1").contains(expectedBroker)); + Assert.assertTrue(tablesMap.get("testTable2").contains(expectedBroker)); + } + + List<String> tableBrokers = JsonUtils + .stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("testTable1", "OFFLINE", state)), + List.class); + for (String expectedBroker : expectedBrokers) { + Assert.assertTrue(tableBrokers.contains(expectedBroker)); + } + tableBrokers = JsonUtils + .stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("testTable1", null, state)), + List.class); + for (String expectedBroker : expectedBrokers) { + Assert.assertTrue(tableBrokers.contains(expectedBroker)); + } + tableBrokers = JsonUtils + .stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("testTable2", "OFFLINE", state)), + List.class); + for (String expectedBroker : expectedBrokers) { + Assert.assertTrue(tableBrokers.contains(expectedBroker)); + } + tableBrokers = JsonUtils + .stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("testTable2", null, state)), + List.class); + for (String expectedBroker : expectedBrokers) { + Assert.assertTrue(tableBrokers.contains(expectedBroker)); + } + try { + sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("nonExistTable", null, state)); + Assert.fail("Shouldn't reach here"); + } catch (Exception e) { + } + } + + @Test + public void testGetBrokers() + throws Exception { + addFakeBrokerInstancesToAutoJoinHelixCluster(10, true); + Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), + 10); + + // Adding table + _helixResourceManager + .addTable(new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_1).setNumReplicas(1).build()); + _helixResourceManager + .addTable(new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_2).setNumReplicas(1).build()); + + // Wait for the table addition + while (!_helixResourceManager.hasOfflineTable(TABLE_NAME_1) && !_helixResourceManager + .hasOfflineTable(TABLE_NAME_2)) { + Thread.sleep(100); + } + + testGetBrokersHelper(null, 10, 0); + testGetBrokersHelper("ONLINE", 10, 0); + testGetBrokersHelper("OFFLINE", 10, 0); + for (int i = 9; i >= 0; i--) { + stopFakeInstance(BROKER_INSTANCE_ID_PREFIX + i); + testGetBrokersHelper(null, i, 10 - i); + testGetBrokersHelper("ONLINE", i, 10 - i); + testGetBrokersHelper("OFFLINE", i, 10 - i); + } + } + + @AfterClass + public void tearDown() { + stopFakeInstances(); + stopController(); + stopZk(); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 76c75fa..0513566 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -396,6 +396,16 @@ public abstract class ControllerTest { _fakeInstanceHelixManagers.clear(); } + protected void stopFakeInstance(String instanceId) { + for (HelixManager helixManager : _fakeInstanceHelixManagers) { + if (helixManager.getInstanceName().equalsIgnoreCase(instanceId)) { + helixManager.disconnect(); + _fakeInstanceHelixManagers.remove(helixManager); + return; + } + } + } + protected Schema createDummySchema(String tableName) { Schema schema = new Schema(); schema.setSchemaName(tableName); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org