This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 41a3fc4 API to get status of consumption of a table (#6322) 41a3fc4 is described below commit 41a3fc4f7639c879a2c76e9c1e29efeaeac6e58e Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Mon Dec 7 10:20:15 2020 -0800 API to get status of consumption of a table (#6322) API to get status of consumption of a table. This includes 1. consumer status 2. partition to offsets map 3. server name 4. last consumed timestamp, for consuming segments across all servers. --- .../restlet/resources/SegmentConsumerInfo.java | 61 ++++ .../api/resources/PinotSegmentRestletResource.java | 29 ++ .../helix/core/PinotHelixResourceManager.java | 18 ++ .../util/ConsumingSegmentInfoReader.java | 167 +++++++++++ .../api/ConsumingSegmentInfoReaderTest.java | 319 +++++++++++++++++++++ .../realtime/HLRealtimeSegmentDataManager.java | 16 ++ .../realtime/LLRealtimeSegmentDataManager.java | 17 ++ .../realtime/RealtimeSegmentDataManager.java | 21 ++ .../pinot/server/api/resources/TablesResource.java | 41 +++ 9 files changed, 689 insertions(+) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java new file mode 100644 index 0000000..83e0433 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.restlet.resources; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; + + +/** + * Information regarding the consumer of a segment + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class SegmentConsumerInfo { + private final String _segmentName; + private final String _consumerState; + private final long _lastConsumedTimestamp; + private final Map<String, String> _partitionToOffsetMap; + + public SegmentConsumerInfo(@JsonProperty("segmentName") String segmentName, + @JsonProperty("consumerState") String consumerState, + @JsonProperty("lastConsumedTimestamp") long lastConsumedTimestamp, + @JsonProperty("partitionToOffsetMap") Map<String, String> partitionToOffsetMap) { + _segmentName = segmentName; + _consumerState = consumerState; + _lastConsumedTimestamp = lastConsumedTimestamp; + _partitionToOffsetMap = partitionToOffsetMap; + } + + public String getSegmentName() { + return _segmentName; + } + + public String getConsumerState() { + return _consumerState; + } + + public long getLastConsumedTimestamp() { + return _lastConsumedTimestamp; + } + + public Map<String, String> getPartitionToOffsetMap() { + return _partitionToOffsetMap; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index 876d3a9..b62ce61 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.JsonNode; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -34,6 +36,7 @@ import javax.annotation.Nullable; import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -58,7 +61,9 @@ import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; +import org.apache.pinot.controller.util.ConsumingSegmentInfoReader; import org.apache.pinot.controller.util.TableMetadataReader; +import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -542,4 +547,28 @@ public class PinotSegmentRestletResource { return tableMetadataReader.getSegmentsMetadata(tableNameWithType, _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); } + + @GET + @Path("/tables/{realtimeTableName}/consumingSegmentsInfo") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Returns state of consuming segments", notes = "Gets the status of consumers from all servers") + @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 404, message = "Table not found"), @ApiResponse(code = 500, message = "Internal server error")}) + public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsInfo( + @ApiParam(value = "Realtime table name with or without type", required = true, example = "myTable | myTable_REALTIME") @PathParam("realtimeTableName") String realtimeTableName) { + try { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName); + if (TableType.OFFLINE == tableType) { + throw new IllegalStateException("Cannot get consuming segments info for OFFLINE table: " + realtimeTableName); + } + String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName); + ConsumingSegmentInfoReader consumingSegmentInfoReader = + new ConsumingSegmentInfoReader(_executor, _connectionManager, _pinotHelixResourceManager); + return consumingSegmentInfoReader + .getConsumingSegmentsInfo(tableNameWithType, _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to get consuming segments info for table %s. %s", realtimeTableName, e.getMessage()), + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 9718d3f..0c99037 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1895,6 +1895,24 @@ public class PinotHelixResourceManager { return serverToSegmentsMap; } + /** + * Returns a set of CONSUMING segments for the given realtime table. + */ + public Set<String> getConsumingSegments(String tableNameWithType) { + IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); + if (idealState == null) { + throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType); + } + Set<String> consumingSegments = new HashSet<>(); + for (String segment : idealState.getPartitionSet()) { + Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segment); + if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) { + consumingSegments.add(segment); + } + } + return consumingSegments; + } + public synchronized Map<String, String> getSegmentsCrcForTable(String tableNameWithType) { // Get the segment list for this table IdealState is = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java new file mode 100644 index 0000000..e440d8b --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.util; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.BiMap; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Executor; +import org.apache.commons.httpclient.HttpConnectionManager; +import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This is a helper class that calls the server API endpoints to fetch consuming segments info + * Only the servers returning success are returned by the method. For servers returning errors (http error or otherwise), + * no entry is created in the return list + */ +public class ConsumingSegmentInfoReader { + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumingSegmentInfoReader.class); + + private final Executor _executor; + private final HttpConnectionManager _connectionManager; + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public ConsumingSegmentInfoReader(Executor executor, HttpConnectionManager connectionManager, + PinotHelixResourceManager helixResourceManager) { + _executor = executor; + _connectionManager = connectionManager; + _pinotHelixResourceManager = helixResourceManager; + } + + /** + * This method retrieves the consuming segments info for a given realtime table. + * @return a map of segmentName to the information about its consumer + */ + public ConsumingSegmentsInfoMap getConsumingSegmentsInfo(String tableNameWithType, int timeoutMs) + throws InvalidConfigException { + final Map<String, List<String>> serverToSegments = + _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); + BiMap<String, String> serverToEndpoints = + _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet()); + + // Gets info for segments with LLRealtimeSegmentDataManager found in the table data manager + Map<String, List<SegmentConsumerInfo>> serverToSegmentConsumerInfoMap = + getConsumingSegmentsInfoFromServers(tableNameWithType, serverToEndpoints, timeoutMs); + TreeMap<String, List<ConsumingSegmentInfo>> consumingSegmentInfoMap = new TreeMap<>(); + for (Map.Entry<String, List<SegmentConsumerInfo>> entry : serverToSegmentConsumerInfoMap.entrySet()) { + String serverName = entry.getKey(); + for (SegmentConsumerInfo info : entry.getValue()) { + consumingSegmentInfoMap.computeIfAbsent(info.getSegmentName(), k -> new ArrayList<>()).add( + new ConsumingSegmentInfo(serverName, info.getConsumerState(), info.getLastConsumedTimestamp(), + info.getPartitionToOffsetMap())); + } + } + // Segments which are in CONSUMING state but found no consumer on the server + Set<String> consumingSegments = _pinotHelixResourceManager.getConsumingSegments(tableNameWithType); + consumingSegments.forEach(c -> consumingSegmentInfoMap.putIfAbsent(c, Collections.emptyList())); + return new ConsumingSegmentsInfoMap(consumingSegmentInfoMap); + } + + /** + * This method makes a MultiGet call to all servers to get the consuming segments info. + * @return servers queried and a list of consumer status information for consuming segments on that server + */ + private Map<String, List<SegmentConsumerInfo>> getConsumingSegmentsInfoFromServers(String tableNameWithType, + BiMap<String, String> serverToEndpoints, int timeoutMs) { + LOGGER.info("Reading consuming segment info from servers: {} for table: {}", serverToEndpoints.keySet(), + tableNameWithType); + + List<String> serverUrls = new ArrayList<>(serverToEndpoints.size()); + BiMap<String, String> endpointsToServers = serverToEndpoints.inverse(); + for (String endpoint : endpointsToServers.keySet()) { + String consumingSegmentInfoURI = generateServerURL(tableNameWithType, endpoint); + serverUrls.add(consumingSegmentInfoURI); + } + + CompletionServiceHelper completionServiceHelper = + new CompletionServiceHelper(_executor, _connectionManager, endpointsToServers); + CompletionServiceHelper.CompletionServiceResponse serviceResponse = + completionServiceHelper.doMultiGetRequest(serverUrls, tableNameWithType, timeoutMs); + Map<String, List<SegmentConsumerInfo>> serverToConsumingSegmentInfoList = new HashMap<>(); + int failedParses = 0; + for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) { + try { + List<SegmentConsumerInfo> segmentConsumerInfos = + JsonUtils.stringToObject(streamResponse.getValue(), new TypeReference<List<SegmentConsumerInfo>>() { + }); + serverToConsumingSegmentInfoList.put(streamResponse.getKey(), segmentConsumerInfos); + } catch (IOException e) { + failedParses++; + LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e); + } + } + if (failedParses != 0) { + LOGGER.warn("Failed to parse {} / {} segment size info responses from servers.", failedParses, serverUrls.size()); + } + return serverToConsumingSegmentInfoList; + } + + private String generateServerURL(String tableNameWithType, String endpoint) { + return String.format("http://%s/tables/%s/consumingSegmentsInfo", endpoint, tableNameWithType); + } + + /** + * Map containing all consuming segments and their status information + */ + @JsonIgnoreProperties(ignoreUnknown = true) + static public class ConsumingSegmentsInfoMap { + public TreeMap<String, List<ConsumingSegmentInfo>> _segmentToConsumingInfoMap; + + public ConsumingSegmentsInfoMap( + @JsonProperty("segmentToConsumingInfoMap") TreeMap<String, List<ConsumingSegmentInfo>> segmentToConsumingInfoMap) { + this._segmentToConsumingInfoMap = segmentToConsumingInfoMap; + } + } + + /** + * Contains all the information about a consuming segment + */ + @JsonIgnoreProperties(ignoreUnknown = true) + static public class ConsumingSegmentInfo { + public String _serverName; + public String _consumerState; + public long _lastConsumedTimestamp; + public Map<String, String> _partitionToOffsetMap; + + public ConsumingSegmentInfo(@JsonProperty("serverName") String serverName, + @JsonProperty("consumerState") String consumerState, + @JsonProperty("lastConsumedTimestamp") long lastConsumedTimestamp, + @JsonProperty("partitionToOffsetMap") Map<String, String> partitionToOffsetMap) { + _serverName = serverName; + _consumerState = consumerState; + _lastConsumedTimestamp = lastConsumedTimestamp; + _partitionToOffsetMap = partitionToOffsetMap; + } + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java new file mode 100644 index 0000000..e09c255 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import org.apache.commons.httpclient.HttpConnectionManager; +import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; +import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.util.ConsumingSegmentInfoReader; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.ConsumerState; +import org.apache.pinot.spi.utils.JsonUtils; +import org.mockito.ArgumentMatchers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +/** + * Tests the {@link ConsumingSegmentInfoReader} + */ +public class ConsumingSegmentInfoReaderTest { + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumingSegmentInfoReaderTest.class); + private final Executor executor = Executors.newFixedThreadPool(1); + private final HttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager(); + private PinotHelixResourceManager helix; + private final Map<String, FakeConsumingInfoServer> serverMap = new HashMap<>(); + private final int timeoutMsec = 10000; + + private static final String TABLE_NAME = "myTable_REALTIME"; + private static final String SEGMENT_NAME_PARTITION_0 = "table__0__29__12345"; + private static final String SEGMENT_NAME_PARTITION_1 = "table__1__32__12345"; + + @BeforeClass + public void setUp() + throws IOException { + helix = mock(PinotHelixResourceManager.class); + String uriPath = "/tables/"; + + // server0 - 1 consumer each for p0 and p1. CONSUMING. + Map<String, String> partitionToOffset0 = new HashMap<>(); + partitionToOffset0.put("0", "150"); + Map<String, String> partitionToOffset1 = new HashMap<>(); + partitionToOffset1.put("1", "150"); + FakeConsumingInfoServer s0 = new FakeConsumingInfoServer(Lists + .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0), + new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1))); + s0.start(uriPath, createHandler(200, s0.consumerInfos, 0)); + serverMap.put("server0", s0); + + // server1 - 1 consumer each for p0 and p1. CONSUMING. + FakeConsumingInfoServer s1 = new FakeConsumingInfoServer(Lists + .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0), + new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1))); + s1.start(uriPath, createHandler(200, s1.consumerInfos, 0)); + serverMap.put("server1", s1); + + // server2 - p1 consumer CONSUMING. p0 consumer NOT_CONSUMING + FakeConsumingInfoServer s2 = new FakeConsumingInfoServer(Lists + .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "NOT_CONSUMING", 0, partitionToOffset0), + new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1))); + s2.start(uriPath, createHandler(200, s2.consumerInfos, 0)); + serverMap.put("server2", s2); + + // server3 - 1 consumer for p1. No consumer for p0 + FakeConsumingInfoServer s3 = new FakeConsumingInfoServer( + Lists.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1))); + s3.start(uriPath, createHandler(200, s3.consumerInfos, 0)); + serverMap.put("server3", s3); + + // server4 - unreachable/error/timeout + FakeConsumingInfoServer s4 = new FakeConsumingInfoServer(Lists + .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0), + new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1))); + s4.start(uriPath, createHandler(200, s4.consumerInfos, timeoutMsec * 1000)); + serverMap.put("server4", s4); + } + + @AfterClass + public void tearDown() { + for (Map.Entry<String, FakeConsumingInfoServer> fakeServerEntry : serverMap.entrySet()) { + fakeServerEntry.getValue().httpServer.stop(0); + } + } + + private HttpHandler createHandler(final int status, final List<SegmentConsumerInfo> consumerInfos, + final int sleepTimeMs) { + return httpExchange -> { + if (sleepTimeMs > 0) { + try { + Thread.sleep(sleepTimeMs); + } catch (InterruptedException e) { + LOGGER.info("Handler interrupted during sleep"); + } + } + String json = JsonUtils.objectToString(consumerInfos); + httpExchange.sendResponseHeaders(status, json.length()); + OutputStream responseBody = httpExchange.getResponseBody(); + responseBody.write(json.getBytes()); + responseBody.close(); + }; + } + + /** + * Server to return fake consuming segment info + */ + private static class FakeConsumingInfoServer { + String endpoint; + InetSocketAddress socket = new InetSocketAddress(0); + List<SegmentConsumerInfo> consumerInfos; + HttpServer httpServer; + + FakeConsumingInfoServer(List<SegmentConsumerInfo> consumerInfos) { + this.consumerInfos = consumerInfos; + } + + private void start(String path, HttpHandler handler) + throws IOException { + httpServer = HttpServer.create(socket, 0); + httpServer.createContext(path, handler); + new Thread(() -> httpServer.start()).start(); + endpoint = "localhost:" + httpServer.getAddress().getPort(); + } + } + + private Map<String, List<String>> subsetOfServerSegments(String... servers) { + Map<String, List<String>> subset = new HashMap<>(); + for (String server : servers) { + subset.put(server, serverMap.get(server).consumerInfos.stream().map(SegmentConsumerInfo::getSegmentName) + .collect(Collectors.toList())); + } + return subset; + } + + private BiMap<String, String> serverEndpoints(String... servers) { + BiMap<String, String> endpoints = HashBiMap.create(servers.length); + for (String server : servers) { + endpoints.put(server, serverMap.get(server).endpoint); + } + return endpoints; + } + + private ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner(final String[] servers, + final Set<String> consumingSegments, String table) + throws InvalidConfigException { + when(helix.getServerToSegmentsMap(anyString())).thenAnswer(invocationOnMock -> subsetOfServerSegments(servers)); + when(helix.getDataInstanceAdminEndpoints(ArgumentMatchers.anySet())) + .thenAnswer(invocationOnMock -> serverEndpoints(servers)); + when(helix.getConsumingSegments(anyString())).thenAnswer(invocationOnMock -> consumingSegments); + ConsumingSegmentInfoReader reader = new ConsumingSegmentInfoReader(executor, connectionManager, helix); + return reader.getConsumingSegmentsInfo(table, timeoutMsec); + } + + @Test + public void testEmptyTable() + throws InvalidConfigException { + ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap = + testRunner(new String[]{}, Collections.emptySet(), TABLE_NAME); + Assert.assertTrue(consumingSegmentsInfoMap._segmentToConsumingInfoMap.isEmpty()); + } + + /** + * 2 servers, 2 partitions, 2 replicas, all CONSUMING + */ + @Test + public void testHappyPath() + throws InvalidConfigException { + final String[] servers = {"server0", "server1"}; + ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap = + testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1), TABLE_NAME); + + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos = + consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0); + Assert.assertEquals(consumingSegmentInfos.size(), 2); + for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) { + checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server1"), ConsumerState.CONSUMING.toString(), "0", + "150"); + } + consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1); + Assert.assertEquals(consumingSegmentInfos.size(), 2); + for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) { + checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server1"), ConsumerState.CONSUMING.toString(), "1", + "150"); + } + } + + /** + * 2 servers, 2 partitions, 2 replicas. p0 consumer in NOT_CONSUMING + */ + @Test + public void testNotConsumingState() + throws InvalidConfigException { + final String[] servers = {"server0", "server2"}; + ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap = + testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1), TABLE_NAME); + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos = + consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0); + Assert.assertEquals(consumingSegmentInfos.size(), 2); + for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) { + if (info._serverName.equals("server0")) { + checkConsumingSegmentInfo(info, Sets.newHashSet("server0"), ConsumerState.CONSUMING.toString(), "0", "150"); + } else { + checkConsumingSegmentInfo(info, Sets.newHashSet("server2"), ConsumerState.NOT_CONSUMING.toString(), "0", "150"); + } + } + consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1); + Assert.assertEquals(consumingSegmentInfos.size(), 2); + for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) { + checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server2"), ConsumerState.CONSUMING.toString(), "1", + "150"); + } + } + + /** + * 1 servers, 2 partitions, 1 replicas. No consumer for p0. CONSUMING state in idealstate. + */ + @Test + public void testNoConsumerButConsumingInIdealState() + throws InvalidConfigException { + final String[] servers = {"server3"}; + ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap = + testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1), TABLE_NAME); + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos = + consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0); + Assert.assertTrue(consumingSegmentInfos.isEmpty()); + + consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1); + Assert.assertEquals(consumingSegmentInfos.size(), 1); + checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server3"), + ConsumerState.CONSUMING.toString(), "1", "150"); + } + + /** + * 1 servers, 2 partitions, 1 replicas. No consumer for p0. OFFLINE state in idealstate. + */ + @Test + public void testNoConsumerOfflineInIdealState() + throws InvalidConfigException { + final String[] servers = {"server3"}; + ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap = + testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_1), TABLE_NAME); + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos = + consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0); + Assert.assertNull(consumingSegmentInfos); + + consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1); + Assert.assertEquals(consumingSegmentInfos.size(), 1); + checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server3"), + ConsumerState.CONSUMING.toString(), "1", "150"); + } + + /** + * 2 servers, 2 partitions, 2 replicas. server4 times out. + */ + @Test + public void testErrorFromServer() + throws InvalidConfigException { + final String[] servers = {"server0", "server4"}; + ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap = + testRunner(servers, Sets.newHashSet(SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1), TABLE_NAME); + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos = + consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0); + Assert.assertEquals(consumingSegmentInfos.size(), 1); + checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server0"), + ConsumerState.CONSUMING.toString(), "0", "150"); + + consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1); + Assert.assertEquals(consumingSegmentInfos.size(), 1); + checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server0"), + ConsumerState.CONSUMING.toString(), "1", "150"); + } + + private void checkConsumingSegmentInfo(ConsumingSegmentInfoReader.ConsumingSegmentInfo info, Set<String> serverNames, + String consumerState, String partition, String offset) { + Assert.assertTrue(serverNames.contains(info._serverName)); + Assert.assertEquals(info._consumerState, consumerState); + Assert.assertEquals(info._partitionToOffsetMap.get(partition), offset); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java index 060f19b..ab608fd 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java @@ -25,6 +25,7 @@ import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TimerTask; import java.util.concurrent.TimeUnit; @@ -417,6 +418,21 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } @Override + public Map<String, String> getPartitionToCurrentOffset() { + throw new UnsupportedOperationException(); + } + + @Override + public ConsumerState getConsumerState() { + throw new UnsupportedOperationException(); + } + + @Override + public long getLastConsumedTimestamp() { + throw new UnsupportedOperationException(); + } + + @Override public String getSegmentName() { return _segmentName; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 13deed5..62e6d5b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -701,6 +701,23 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } } + @Override + public Map<String, String> getPartitionToCurrentOffset() { + Map<String, String> partitionToCurrentOffset = new HashMap<>(); + partitionToCurrentOffset.put(String.valueOf(_streamPartitionId), _currentOffset.toString()); + return partitionToCurrentOffset; + } + + @Override + public ConsumerState getConsumerState() { + return _state == State.ERROR ? ConsumerState.NOT_CONSUMING : ConsumerState.CONSUMING; + } + + @Override + public long getLastConsumedTimestamp() { + return _lastLogTime; + } + @VisibleForTesting protected StreamPartitionMsgOffset getCurrentOffset() { return _currentOffset; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 6d95ffa..0086c4f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.data.manager.realtime; +import java.util.Map; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.core.data.manager.SegmentDataManager; import org.apache.pinot.core.indexsegment.mutable.MutableSegment; @@ -28,6 +29,14 @@ import org.apache.pinot.core.io.writer.impl.MmapMemoryManager; public abstract class RealtimeSegmentDataManager extends SegmentDataManager { + /** + * The state of the consumer of this segment + */ + public enum ConsumerState { + CONSUMING, + NOT_CONSUMING // In error state + } + @Override public abstract MutableSegment getSegment(); @@ -41,4 +50,16 @@ public abstract class RealtimeSegmentDataManager extends SegmentDataManager { return new DirectMemoryManager(segmentName, serverMetrics); } } + + /** + * Get the current offsets for all partitions of this consumer + */ + public abstract Map<String, String> getPartitionToCurrentOffset(); + + /** + * Get the state of the consumer + */ + public abstract ConsumerState getConsumerState(); + + public abstract long getLastConsumedTimestamp(); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index 65d1a5f..e5d533c 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -45,6 +45,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo; import org.apache.pinot.common.restlet.resources.ResourceUtils; import org.apache.pinot.common.restlet.resources.TableSegments; import org.apache.pinot.common.restlet.resources.TablesList; @@ -52,10 +53,13 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.SegmentDataManager; import org.apache.pinot.core.data.manager.TableDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; import org.apache.pinot.server.api.access.AccessControl; import org.apache.pinot.server.api.access.AccessControlFactory; import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -239,4 +243,41 @@ public class TablesResource { tableDataManager.releaseSegment(segmentDataManager); } } + + @GET + @Path("tables/{realtimeTableName}/consumingSegmentsInfo") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get the info for consumers of this REALTIME table", notes = "Get consumers info from the table data manager") + public List<SegmentConsumerInfo> getConsumingSegmentsInfo( + @ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName") String realtimeTableName) { + + TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName); + if (TableType.OFFLINE == tableType) { + throw new WebApplicationException("Cannot get consuming segment info for OFFLINE table: " + realtimeTableName); + } + String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName); + + List<SegmentConsumerInfo> segmentConsumerInfoList = new ArrayList<>(); + TableDataManager tableDataManager = checkGetTableDataManager(tableNameWithType); + List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireAllSegments(); + try { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + if (segmentDataManager instanceof RealtimeSegmentDataManager) { + RealtimeSegmentDataManager realtimeSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager; + String segmentName = segmentDataManager.getSegmentName(); + segmentConsumerInfoList.add( + new SegmentConsumerInfo(segmentName, realtimeSegmentDataManager.getConsumerState().toString(), + realtimeSegmentDataManager.getLastConsumedTimestamp(), + realtimeSegmentDataManager.getPartitionToCurrentOffset())); + } + } + } catch (Exception e) { + throw new WebApplicationException("Caught exception when getting consumer info for table: " + realtimeTableName); + } finally { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + tableDataManager.releaseSegment(segmentDataManager); + } + } + return segmentConsumerInfoList; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org