This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7b69d094be Refactoring the upsert compaction related code (#12275) 7b69d094be is described below commit 7b69d094be7bc410d4bf8fe15e4c30233421e300 Author: Seunghyun Lee <seungh...@startree.ai> AuthorDate: Mon Jan 22 00:55:40 2024 -0800 Refactoring the upsert compaction related code (#12275) * Refactoring the upsert compaction related code 1. Fix the issue with fetching validDocId metadata for table with a large number of segments. (Added POST API with list of segments to be part of the request body) 2. Added POST support for MultiHttpRequest to cover 1. 3. Added GET /tables/<tableName>/validDocIdMetadata API on the controller for improving debuggability. * Addressing comments --- .../requesthandler/BaseBrokerRequestHandler.java | 4 +- .../apache/pinot/common/http/MultiHttpRequest.java | 51 +++++-- .../restlet/resources/ValidDocIdMetadataInfo.java | 56 +++++++ .../pinot/common/http/MultiHttpRequestTest.java | 106 ++++++++++++-- .../api/resources/PinotRunningQueryResource.java | 2 +- .../api/resources/PinotTableRestletResource.java | 37 +++++ .../controller/util/CompletionServiceHelper.java | 52 ++++++- .../util/ServerSegmentMetadataReader.java | 161 ++++++++++++++++++--- .../pinot/controller/util/TableMetadataReader.java | 30 +++- .../pinot/plugin/minion/tasks/MinionTaskUtils.java | 41 ++++++ .../UpsertCompactionTaskExecutor.java | 141 +----------------- .../UpsertCompactionTaskGenerator.java | 106 ++++---------- .../UpsertCompactionTaskExecutorTest.java | 7 +- .../UpsertCompactionTaskGeneratorTest.java | 86 +++-------- .../readers/CompactedPinotSegmentRecordReader.java | 106 ++++++++++++++ .../pinot/server/api/resources/TablesResource.java | 57 ++++++-- .../pinot/server/api/TablesResourceTest.java | 22 +++ 17 files changed, 715 insertions(+), 350 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index b2da166510..cc640110fc 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -209,9 +209,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { // TODO: Use different global query id for OFFLINE and REALTIME table after releasing 0.12.0. See QueryIdUtils for // details String globalQueryId = getGlobalQueryId(requestId); - List<String> serverUrls = new ArrayList<>(); + List<Pair<String, String>> serverUrls = new ArrayList<>(); for (ServerInstance serverInstance : queryServers._servers) { - serverUrls.add(String.format("%s/query/%s", serverInstance.getAdminEndpoint(), globalQueryId)); + serverUrls.add(Pair.of(String.format("%s/query/%s", serverInstance.getAdminEndpoint(), globalQueryId), null)); } LOGGER.debug("Cancelling the query: {} via server urls: {}", queryServers._query, serverUrls); CompletionService<MultiHttpRequestResponse> completionService = diff --git a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java index a28674e2ad..73efb2c598 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java @@ -19,6 +19,7 @@ package org.apache.pinot.common.http; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletionService; @@ -26,11 +27,14 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.function.Function; import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.conn.HttpClientConnectionManager; +import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; @@ -66,14 +70,29 @@ public class MultiHttpRequest { * @return instance of CompletionService. Completion service will provide * results as they arrive. The order is NOT same as the order of URLs */ - public CompletionService<MultiHttpRequestResponse> execute(List<String> urls, + public CompletionService<MultiHttpRequestResponse> executeGet(List<String> urls, @Nullable Map<String, String> requestHeaders, int timeoutMs) { - return execute(urls, requestHeaders, timeoutMs, "GET", HttpGet::new); + List<Pair<String, String>> urlsAndRequestBodies = new ArrayList<>(); + urls.forEach(url -> urlsAndRequestBodies.add(Pair.of(url, ""))); + return execute(urlsAndRequestBodies, requestHeaders, timeoutMs, "GET", HttpGet::new); + } + + /** + * POST urls in parallel using the executor service. + * @param urlsAndRequestBodies absolute URLs to POST + * @param requestHeaders headers to set when making the request + * @param timeoutMs timeout in milliseconds for each POST request + * @return instance of CompletionService. Completion service will provide + * results as they arrive. The order is NOT same as the order of URLs + */ + public CompletionService<MultiHttpRequestResponse> executePost(List<Pair<String, String>> urlsAndRequestBodies, + @Nullable Map<String, String> requestHeaders, int timeoutMs) { + return execute(urlsAndRequestBodies, requestHeaders, timeoutMs, "POST", HttpPost::new); } /** * Execute certain http method on the urls in parallel using the executor service. - * @param urls absolute URLs to execute the http method + * @param urlsAndRequestBodies absolute URLs to execute the http method * @param requestHeaders headers to set when making the request * @param timeoutMs timeout in milliseconds for each http request * @param httpMethodName the name of the http method like GET, DELETE etc. @@ -81,22 +100,28 @@ public class MultiHttpRequest { * @return instance of CompletionService. Completion service will provide * results as they arrive. The order is NOT same as the order of URLs */ - public <T extends HttpRequestBase> CompletionService<MultiHttpRequestResponse> execute(List<String> urls, - @Nullable Map<String, String> requestHeaders, int timeoutMs, String httpMethodName, - Function<String, T> httpRequestBaseSupplier) { + public <T extends HttpRequestBase> CompletionService<MultiHttpRequestResponse> execute( + List<Pair<String, String>> urlsAndRequestBodies, @Nullable Map<String, String> requestHeaders, int timeoutMs, + String httpMethodName, Function<String, T> httpRequestBaseSupplier) { // Create global request configuration - RequestConfig defaultRequestConfig = RequestConfig.custom() - .setConnectionRequestTimeout(timeoutMs) - .setSocketTimeout(timeoutMs).build(); // setting the socket + RequestConfig defaultRequestConfig = + RequestConfig.custom().setConnectionRequestTimeout(timeoutMs).setSocketTimeout(timeoutMs) + .build(); // setting the socket - HttpClientBuilder httpClientBuilder = HttpClients.custom() - .setConnectionManager(_connectionManager).setDefaultRequestConfig(defaultRequestConfig); + HttpClientBuilder httpClientBuilder = + HttpClients.custom().setConnectionManager(_connectionManager).setDefaultRequestConfig(defaultRequestConfig); CompletionService<MultiHttpRequestResponse> completionService = new ExecutorCompletionService<>(_executor); CloseableHttpClient client = httpClientBuilder.build(); - for (String url : urls) { + for (Pair<String, String> pair : urlsAndRequestBodies) { completionService.submit(() -> { - T httpMethod = httpRequestBaseSupplier.apply(url); + String url = pair.getLeft(); + String body = pair.getRight(); + HttpRequestBase httpMethod = httpRequestBaseSupplier.apply(url); + // If the http method is POST, set the request body + if (httpMethod instanceof HttpPost) { + ((HttpPost) httpMethod).setEntity(new StringEntity(body)); + } if (requestHeaders != null) { requestHeaders.forEach(((HttpRequestBase) httpMethod)::setHeader); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java new file mode 100644 index 0000000000..475ba91432 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.restlet.resources; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + + +@JsonIgnoreProperties(ignoreUnknown = true) +public class ValidDocIdMetadataInfo { + private final String _segmentName; + private final long _totalValidDocs; + private final long _totalInvalidDocs; + private final long _totalDocs; + + public ValidDocIdMetadataInfo(@JsonProperty("segmentName") String segmentName, + @JsonProperty("totalValidDocs") long totalValidDocs, @JsonProperty("totalInvalidDocs") long totalInvalidDocs, + @JsonProperty("totalDocs") long totalDocs) { + _segmentName = segmentName; + _totalValidDocs = totalValidDocs; + _totalInvalidDocs = totalInvalidDocs; + _totalDocs = totalDocs; + } + + public String getSegmentName() { + return _segmentName; + } + + public long getTotalValidDocs() { + return _totalValidDocs; + } + + public long getTotalInvalidDocs() { + return _totalInvalidDocs; + } + + public long getTotalDocs() { + return _totalDocs; + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java index 61f58f78f6..292e2ec82e 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import org.apache.commons.lang3.tuple.Pair; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; @@ -43,6 +44,30 @@ import org.testng.annotations.Test; public class MultiHttpRequestTest { + class TestResult { + private final int _success; + private final int _errors; + private final int _timeouts; + + public TestResult(int success, int errors, int timeouts) { + _success = success; + _errors = errors; + _timeouts = timeouts; + } + + public int getSuccess() { + return _success; + } + + public int getErrors() { + return _errors; + } + + public int getTimeouts() { + return _timeouts; + } + } + private static final Logger LOGGER = LoggerFactory.getLogger(MultiHttpRequest.class); private static final String SUCCESS_MSG = "success"; private static final String ERROR_MSG = "error"; @@ -61,6 +86,7 @@ public class MultiHttpRequestTest { startServer(_portStart, createHandler(SUCCESS_CODE, SUCCESS_MSG, 0)); startServer(_portStart + 1, createHandler(ERROR_CODE, ERROR_MSG, 0)); startServer(_portStart + 2, createHandler(SUCCESS_CODE, TIMEOUT_MSG, TIMEOUT_MS)); + startServer(_portStart + 3, createPostHandler(SUCCESS_CODE, SUCCESS_MSG, 0)); } @AfterTest @@ -90,6 +116,33 @@ public class MultiHttpRequestTest { }; } + private HttpHandler createPostHandler(final int status, final String msg, final int sleepTimeMs) { + return new HttpHandler() { + @Override + public void handle(HttpExchange httpExchange) + throws IOException { + if (sleepTimeMs > 0) { + try { + Thread.sleep(sleepTimeMs); + } catch (InterruptedException e) { + LOGGER.info("Handler interrupted during sleep"); + } + } + if (httpExchange.getRequestMethod().equals("POST")) { + httpExchange.sendResponseHeaders(status, msg.length()); + OutputStream responseBody = httpExchange.getResponseBody(); + responseBody.write(msg.getBytes()); + responseBody.close(); + } else { + httpExchange.sendResponseHeaders(ERROR_CODE, ERROR_MSG.length()); + OutputStream responseBody = httpExchange.getResponseBody(); + responseBody.write(ERROR_MSG.getBytes()); + responseBody.close(); + } + } + }; + } + private void startServer(int port, HttpHandler handler) throws IOException { final HttpServer server = HttpServer.create(new InetSocketAddress(port), 0); @@ -104,22 +157,58 @@ public class MultiHttpRequestTest { } @Test - public void testMultiGet() throws Exception { - MultiHttpRequest mget = - new MultiHttpRequest(Executors.newCachedThreadPool(), new PoolingHttpClientConnectionManager()); + public void testMultiGet() { List<String> urls = Arrays.asList("http://localhost:" + String.valueOf(_portStart) + URI_PATH, "http://localhost:" + String.valueOf(_portStart + 1) + URI_PATH, "http://localhost:" + String.valueOf(_portStart + 2) + URI_PATH, // 2nd request to the same server - "http://localhost:" + String.valueOf(_portStart) + URI_PATH); + "http://localhost:" + String.valueOf(_portStart) + URI_PATH, + "http://localhost:" + String.valueOf(_portStart + 3) + URI_PATH); + + MultiHttpRequest mget = + new MultiHttpRequest(Executors.newCachedThreadPool(), new PoolingHttpClientConnectionManager()); + // timeout value needs to be less than 5000ms set above for // third server final int requestTimeoutMs = 1000; - CompletionService<MultiHttpRequestResponse> completionService = mget.execute(urls, null, requestTimeoutMs); + CompletionService<MultiHttpRequestResponse> completionService = mget.executeGet(urls, null, requestTimeoutMs); + + TestResult result = collectResult(completionService, urls.size()); + Assert.assertEquals(result.getSuccess(), 2); + Assert.assertEquals(result.getErrors(), 2); + Assert.assertEquals(result.getTimeouts(), 1); + } + + @Test + public void testMultiPost() { + List<Pair<String, String>> urlsAndRequestBodies = + List.of(Pair.of("http://localhost:" + String.valueOf(_portStart) + URI_PATH, "b0"), + Pair.of("http://localhost:" + String.valueOf(_portStart + 1) + URI_PATH, "b1"), + Pair.of("http://localhost:" + String.valueOf(_portStart + 2) + URI_PATH, "b2"), + // 2nd request to the same server + Pair.of("http://localhost:" + String.valueOf(_portStart) + URI_PATH, "b3"), + Pair.of("http://localhost:" + String.valueOf(_portStart + 3) + URI_PATH, "b4")); + + MultiHttpRequest mpost = + new MultiHttpRequest(Executors.newCachedThreadPool(), new PoolingHttpClientConnectionManager()); + + // timeout value needs to be less than 5000ms set above for + // third server + final int requestTimeoutMs = 1000; + CompletionService<MultiHttpRequestResponse> completionService = + mpost.executePost(urlsAndRequestBodies, null, requestTimeoutMs); + + TestResult result = collectResult(completionService, urlsAndRequestBodies.size()); + Assert.assertEquals(result.getSuccess(), 3); + Assert.assertEquals(result.getErrors(), 1); + Assert.assertEquals(result.getTimeouts(), 1); + } + + private TestResult collectResult(CompletionService<MultiHttpRequestResponse> completionService, int size) { int success = 0; int errors = 0; int timeouts = 0; - for (int i = 0; i < urls.size(); i++) { + for (int i = 0; i < size; i++) { try (MultiHttpRequestResponse httpRequestResponse = completionService.take().get()) { if (httpRequestResponse.getResponse().getStatusLine().getStatusCode() >= 300) { errors++; @@ -143,9 +232,6 @@ public class MultiHttpRequestTest { errors++; } } - - Assert.assertEquals(success, 2); - Assert.assertEquals(errors, 1); - Assert.assertEquals(timeouts, 1); + return new TestResult(success, errors, timeouts); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java index 8aa004b35b..29bdccf4eb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java @@ -187,7 +187,7 @@ public class PinotRunningQueryResource { } LOGGER.debug("Getting running queries via broker urls: {}", brokerUrls); CompletionService<MultiHttpRequestResponse> completionService = - new MultiHttpRequest(_executor, _httpConnMgr).execute(brokerUrls, requestHeaders, timeoutMs); + new MultiHttpRequest(_executor, _httpConnMgr).executeGet(brokerUrls, requestHeaders, timeoutMs); Map<String, Map<String, String>> queriesByBroker = new HashMap<>(); List<String> errMsgs = new ArrayList<>(brokerUrls.size()); for (int i = 0; i < brokerUrls.size(); i++) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 5d35e1f0c3..d7894c08f7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -950,6 +950,43 @@ public class PinotTableRestletResource { return segmentsMetadata; } + @GET + @Path("tables/{tableName}/validDocIdMetadata") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_METADATA) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get the aggregate valid doc id metadata of all segments for a table", notes = "Get the " + + "aggregate valid doc id metadata of all segments for a table") + public String getTableAggregateValidDocIdMetadata( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, + @ApiParam(value = "A list of segments", allowMultiple = true) @QueryParam("segmentNames") + List<String> segmentNames) { + LOGGER.info("Received a request to fetch aggregate valid doc id metadata for a table {}", tableName); + TableType tableType = Constants.validateTableType(tableTypeStr); + if (tableType == TableType.OFFLINE) { + throw new ControllerApplicationException(LOGGER, "Table type : " + tableTypeStr + " not yet supported.", + Response.Status.NOT_IMPLEMENTED); + } + String tableNameWithType = + ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); + + String validDocIdMetadata; + try { + TableMetadataReader tableMetadataReader = + new TableMetadataReader(_executor, _connectionManager, _pinotHelixResourceManager); + JsonNode segmentsMetadataJson = + tableMetadataReader.getAggregateValidDocIdMetadata(tableNameWithType, segmentNames, + _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + validDocIdMetadata = JsonUtils.objectToPrettyString(segmentsMetadataJson); + } catch (InvalidConfigException e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST); + } catch (IOException ioe) { + throw new ControllerApplicationException(LOGGER, "Error parsing Pinot server response: " + ioe.getMessage(), + Response.Status.INTERNAL_SERVER_ERROR, ioe); + } + return validDocIdMetadata; + } + @GET @Path("tables/{tableName}/indexes") @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_METADATA) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java index d26f5ba603..f0b36b2cfa 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.concurrent.CompletionService; import java.util.concurrent.Executor; import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.util.EntityUtils; import org.apache.pinot.common.http.MultiHttpRequest; @@ -79,12 +80,46 @@ public class CompletionServiceHelper { public CompletionServiceResponse doMultiGetRequest(List<String> serverURLs, String tableNameWithType, boolean multiRequestPerServer, @Nullable Map<String, String> requestHeaders, int timeoutMs, @Nullable String useCase) { - CompletionServiceResponse completionServiceResponse = new CompletionServiceResponse(); - // TODO: use some service other than completion service so that we know which server encounters the error CompletionService<MultiHttpRequestResponse> completionService = - new MultiHttpRequest(_executor, _httpConnectionManager).execute(serverURLs, requestHeaders, timeoutMs); - for (int i = 0; i < serverURLs.size(); i++) { + new MultiHttpRequest(_executor, _httpConnectionManager).executeGet(serverURLs, requestHeaders, timeoutMs); + + return collectResponse(tableNameWithType, serverURLs.size(), completionService, multiRequestPerServer, useCase); + } + + /** + * This method makes a MultiPost call to all given URLs and its corresponding bodies. + * @param serverURLsAndRequestBodies server urls to send GET request. + * @param tableNameWithType table name with type suffix + * @param multiRequestPerServer it's possible that need to send multiple requests to a same server. + * If multiRequestPerServer is set as false, return as long as one of the requests get + * response; If multiRequestPerServer is set as true, wait until all requests + * get response. + * @param requestHeaders Headers to be set when making the http calls. + * @param timeoutMs timeout in milliseconds to wait per request. + * @param useCase the use case initiating the multi-get request. If not null and an exception is thrown, only the + * error message and the use case are logged instead of the full stack trace. + * @return CompletionServiceResponse Map of the endpoint(server instance, or full request path if + * multiRequestPerServer is true) to the response from that endpoint. + */ + public CompletionServiceResponse doMultiPostRequest(List<Pair<String, String>> serverURLsAndRequestBodies, + String tableNameWithType, boolean multiRequestPerServer, @Nullable Map<String, String> requestHeaders, + int timeoutMs, @Nullable String useCase) { + + CompletionService<MultiHttpRequestResponse> completionService = + new MultiHttpRequest(_executor, _httpConnectionManager).executePost(serverURLsAndRequestBodies, requestHeaders, + timeoutMs); + + return collectResponse(tableNameWithType, serverURLsAndRequestBodies.size(), completionService, + multiRequestPerServer, useCase); + } + + private CompletionServiceResponse collectResponse(String tableNameWithType, int size, + CompletionService<MultiHttpRequestResponse> completionService, boolean multiRequestPerServer, + @Nullable String useCase) { + CompletionServiceResponse completionServiceResponse = new CompletionServiceResponse(); + + for (int i = 0; i < size; i++) { MultiHttpRequestResponse multiHttpRequestResponse = null; try { multiHttpRequestResponse = completionService.take().get(); @@ -93,7 +128,8 @@ public class CompletionServiceHelper { _endpointsToServers.get(String.format("%s://%s:%d", uri.getScheme(), uri.getHost(), uri.getPort())); int statusCode = multiHttpRequestResponse.getResponse().getStatusLine().getStatusCode(); if (statusCode >= 300) { - LOGGER.error("Server: {} returned error: {}", instance, statusCode); + String reason = multiHttpRequestResponse.getResponse().getStatusLine().getReasonPhrase(); + LOGGER.error("Server: {} returned error: {}, reason: {}", instance, statusCode, reason); completionServiceResponse._failedResponseCount++; continue; } @@ -102,7 +138,7 @@ public class CompletionServiceHelper { .put(multiRequestPerServer ? uri.toString() : instance, responseString); } catch (Exception e) { String reason = useCase == null ? "" : String.format(" in '%s'", useCase); - LOGGER.error("Connection error{}. Details: {}", reason, e.getMessage()); + LOGGER.error("Connection error {}. Details: {}", reason, e.getMessage()); completionServiceResponse._failedResponseCount++; } finally { if (multiHttpRequestResponse != null) { @@ -116,9 +152,9 @@ public class CompletionServiceHelper { } int numServersResponded = completionServiceResponse._httpResponses.size(); - if (numServersResponded != serverURLs.size()) { + if (numServersResponded != size) { LOGGER.warn("Finished reading information for table: {} with {}/{} server responses", tableNameWithType, - numServersResponded, serverURLs.size()); + numServersResponded, size); } else { LOGGER.info("Finished reading information for table: {}", tableNameWithType); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java index a51881d9e3..de1cae193c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java @@ -18,19 +18,35 @@ */ package org.apache.pinot.controller.util; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import javax.annotation.Nullable; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.tuple.Pair; import org.apache.http.conn.HttpClientConnectionManager; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.pinot.common.restlet.resources.TableMetadataInfo; +import org.apache.pinot.common.restlet.resources.TableSegments; +import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo; +import org.apache.pinot.common.utils.RoaringBitmapUtils; import org.apache.pinot.spi.utils.JsonUtils; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +63,11 @@ public class ServerSegmentMetadataReader { private final Executor _executor; private final HttpClientConnectionManager _connectionManager; + public ServerSegmentMetadataReader() { + _executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + _connectionManager = new PoolingHttpClientConnectionManager(); + } + public ServerSegmentMetadataReader(Executor executor, HttpClientConnectionManager connectionManager) { _executor = executor; _connectionManager = connectionManager; @@ -100,11 +121,11 @@ public class ServerSegmentMetadataReader { tableMetadataInfo.getColumnCardinalityMap().forEach((k, v) -> columnCardinalityMap.merge(k, v, Double::sum)); tableMetadataInfo.getMaxNumMultiValuesMap().forEach((k, v) -> maxNumMultiValuesMap.merge(k, v, Double::sum)); tableMetadataInfo.getColumnIndexSizeMap().forEach((k, v) -> columnIndexSizeMap.merge(k, v, (l, r) -> { - for (Map.Entry<String, Double> e : r.entrySet()) { - l.put(e.getKey(), l.getOrDefault(e.getKey(), 0d) + e.getValue()); - } - return l; - })); + for (Map.Entry<String, Double> e : r.entrySet()) { + l.put(e.getKey(), l.getOrDefault(e.getKey(), 0d) + e.getValue()); + } + return l; + })); } catch (IOException e) { failedParses++; LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e); @@ -180,27 +201,129 @@ public class ServerSegmentMetadataReader { return segmentsMetadata; } + /** + * This method is called when the API request is to fetch validDocId metadata for a list segments of the given table. + * This method will pick a server that hosts the target segment and fetch the segment metadata result. + * + * @return segment metadata as a JSON string + */ + public List<ValidDocIdMetadataInfo> getValidDocIdMetadataFromServer(String tableNameWithType, + Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints, + @Nullable List<String> segmentNames, int timeoutMs) { + List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>(); + for (Map.Entry<String, List<String>> serverToSegments : serverToSegmentsMap.entrySet()) { + List<String> segmentsForServer = serverToSegments.getValue(); + List<String> segmentsToQuery = new ArrayList<>(); + if (segmentNames == null || segmentNames.isEmpty()) { + segmentsToQuery.addAll(segmentsForServer); + } else { + Set<String> segmentNamesLookUpTable = new HashSet<>(segmentNames); + for (String segment : segmentsForServer) { + if (segmentNamesLookUpTable.contains(segment)) { + segmentsToQuery.add(segment); + } + } + } + serverURLsAndBodies.add(generateValidDocIdMetadataURL(tableNameWithType, segmentsToQuery, + serverToEndpoints.get(serverToSegments.getKey()))); + } + + // request the urls from the servers + CompletionServiceHelper completionServiceHelper = + new CompletionServiceHelper(_executor, _connectionManager, serverToEndpoints); + + Map<String, String> requestHeaders = Map.of("Content-Type", "application/json"); + CompletionServiceHelper.CompletionServiceResponse serviceResponse = + completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, tableNameWithType, false, requestHeaders, + timeoutMs, null); + + List<ValidDocIdMetadataInfo> validDocIdMetadataInfos = new ArrayList<>(); + int failedParses = 0; + int returnedSegmentsCount = 0; + for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) { + try { + String validDocIdMetadataList = streamResponse.getValue(); + List<ValidDocIdMetadataInfo> validDocIdMetadataInfo = + JsonUtils.stringToObject(validDocIdMetadataList, new TypeReference<ArrayList<ValidDocIdMetadataInfo>>() { + }); + validDocIdMetadataInfos.addAll(validDocIdMetadataInfo); + returnedSegmentsCount++; + } catch (Exception e) { + failedParses++; + LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e); + } + } + if (failedParses != 0) { + LOGGER.error("Unable to parse server {} / {} response due to an error: ", failedParses, + serverURLsAndBodies.size()); + } + + if (segmentNames != null && returnedSegmentsCount != segmentNames.size()) { + LOGGER.error("Unable to get validDocIdMetadata from all servers. Expected: {}, Actual: {}", segmentNames.size(), + returnedSegmentsCount); + } + LOGGER.info("Retrieved valid doc id metadata for {} segments from {} servers.", returnedSegmentsCount, + serverURLsAndBodies.size()); + return validDocIdMetadataInfos; + } + + /** + * This method is called when the API request is to fetch validDocIds for a segment of the given table. This method + * will pick a server that hosts the target segment and fetch the validDocIds result. + * + * @return a bitmap of validDocIds + */ + public RoaringBitmap getValidDocIdsFromServer(String tableNameWithType, String segmentName, String endpoint, + int timeoutMs) { + // Build the endpoint url + String url = generateValidDocIdsURL(tableNameWithType, segmentName, endpoint); + + // Set timeout + ClientConfig clientConfig = new ClientConfig(); + clientConfig.property(ClientProperties.CONNECT_TIMEOUT, timeoutMs); + clientConfig.property(ClientProperties.READ_TIMEOUT, timeoutMs); + + Response response = ClientBuilder.newClient(clientConfig).target(url).request().get(Response.class); + Preconditions.checkState(response.getStatus() == Response.Status.OK.getStatusCode(), + "Unable to retrieve validDocIds from %s", url); + byte[] validDocIds = response.readEntity(byte[].class); + return RoaringBitmapUtils.deserialize(validDocIds); + } + private String generateAggregateSegmentMetadataServerURL(String tableNameWithType, List<String> columns, String endpoint) { - try { - tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8.name()); - String paramsStr = generateColumnsParam(columns); - return String.format("%s/tables/%s/metadata?%s", endpoint, tableNameWithType, paramsStr); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e.getCause()); - } + tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8); + String paramsStr = generateColumnsParam(columns); + return String.format("%s/tables/%s/metadata?%s", endpoint, tableNameWithType, paramsStr); } private String generateSegmentMetadataServerURL(String tableNameWithType, String segmentName, List<String> columns, String endpoint) { + tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8); + segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8); + String paramsStr = generateColumnsParam(columns); + return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint, tableNameWithType, segmentName, paramsStr); + } + + private String generateValidDocIdsURL(String tableNameWithType, String segmentName, String endpoint) { + tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8); + segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8); + return String.format("%s/segments/%s/%s/validDocIds", endpoint, tableNameWithType, segmentName); + } + + private Pair<String, String> generateValidDocIdMetadataURL(String tableNameWithType, List<String> segmentNames, + String endpoint) { + tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8); + TableSegments tableSegments = new TableSegments(segmentNames); + String jsonTableSegments; try { - tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8.name()); - segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8.name()); - String paramsStr = generateColumnsParam(columns); - return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint, tableNameWithType, segmentName, paramsStr); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e.getCause()); + jsonTableSegments = JsonUtils.objectToString(tableSegments); + } catch (JsonProcessingException e) { + LOGGER.error("Failed to convert segment names to json request body: segmentNames={}", segmentNames); + throw new RuntimeException(e); } + return Pair.of( + String.format("%s/tables/%s/validDocIdMetadata", endpoint, tableNameWithType), jsonTableSegments); } private String generateColumnsParam(List<String> columns) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java index 7ef5302c1b..f92747b497 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java @@ -31,6 +31,7 @@ import java.util.stream.Collectors; import org.apache.http.conn.HttpClientConnectionManager; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.restlet.resources.TableMetadataInfo; +import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.spi.utils.JsonUtils; @@ -81,8 +82,9 @@ public class TableMetadataReader { } } - List<String> segmentsMetadata = serverSegmentMetadataReader - .getSegmentMetadataFromServer(tableNameWithType, serverToSegmentsMap, endpoints, columns, timeoutMs); + List<String> segmentsMetadata = + serverSegmentMetadataReader.getSegmentMetadataFromServer(tableNameWithType, serverToSegmentsMap, endpoints, + columns, timeoutMs); Map<String, JsonNode> response = new HashMap<>(); for (String segmentMetadata : segmentsMetadata) { JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata); @@ -146,8 +148,28 @@ public class TableMetadataReader { ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader(_executor, _connectionManager); - TableMetadataInfo aggregateTableMetadataInfo = serverSegmentMetadataReader - .getAggregatedTableMetadataFromServer(tableNameWithType, endpoints, columns, numReplica, timeoutMs); + TableMetadataInfo aggregateTableMetadataInfo = + serverSegmentMetadataReader.getAggregatedTableMetadataFromServer(tableNameWithType, endpoints, columns, + numReplica, timeoutMs); + return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo); + } + + /** + * This method retrieves the aggregated valid doc id metadata for a given table. + * @return a list of ValidDocIdMetadataInfo + */ + public JsonNode getAggregateValidDocIdMetadata(String tableNameWithType, List<String> segmentNames, int timeoutMs) + throws InvalidConfigException { + final Map<String, List<String>> serverToSegments = + _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); + BiMap<String, String> endpoints = + _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet()); + ServerSegmentMetadataReader serverSegmentMetadataReader = + new ServerSegmentMetadataReader(_executor, _connectionManager); + + List<ValidDocIdMetadataInfo> aggregateTableMetadataInfo = + serverSegmentMetadataReader.getValidDocIdMetadataFromServer(tableNameWithType, serverToSegments, endpoints, + segmentNames, timeoutMs); return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo); } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index a46f903ed9..31f5d6039c 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -21,15 +21,23 @@ package org.apache.pinot.plugin.minion.tasks; import java.net.URI; import java.util.HashMap; import java.util.Map; +import org.apache.helix.HelixAdmin; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; +import org.apache.pinot.controller.util.ServerSegmentMetadataReader; +import org.apache.pinot.minion.MinionContext; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.LocalPinotFS; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,4 +138,37 @@ public class MinionTaskUtils { } return dirInStr; } + + public static RoaringBitmap getValidDocIds(String tableNameWithType, String segmentName, Map<String, String> configs, + MinionContext minionContext) { + HelixAdmin helixAdmin = minionContext.getHelixManager().getClusterManagmentTool(); + String clusterName = minionContext.getHelixManager().getClusterName(); + + String server = getServer(segmentName, tableNameWithType, helixAdmin, clusterName); + InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, server); + String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig); + + // We only need aggregated table size and the total number of docs/rows. Skipping column related stats, by + // passing an empty list. + ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader(); + return serverSegmentMetadataReader.getValidDocIdsFromServer(tableNameWithType, segmentName, endpoint, 60_000); + } + + public static String getServer(String segmentName, String tableNameWithType, HelixAdmin helixAdmin, + String clusterName) { + ExternalView externalView = helixAdmin.getResourceExternalView(clusterName, tableNameWithType); + if (externalView == null) { + throw new IllegalStateException("External view does not exist for table: " + tableNameWithType); + } + Map<String, String> instanceStateMap = externalView.getStateMap(segmentName); + if (instanceStateMap == null) { + throw new IllegalStateException("Failed to find segment: " + segmentName); + } + for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { + if (entry.getValue().equals(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)) { + return entry.getKey(); + } + } + throw new IllegalStateException("Failed to find ONLINE server for segment: " + segmentName); + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java index aa37ac871a..4c200b9606 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java @@ -18,118 +18,28 @@ */ package org.apache.pinot.plugin.minion.tasks.upsertcompaction; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import java.io.File; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; import java.util.Collections; import java.util.Map; -import java.util.Set; -import javax.annotation.Nullable; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.core.Response; import org.apache.commons.io.FileUtils; -import org.apache.helix.HelixAdmin; -import org.apache.helix.HelixManager; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.InstanceConfig; -import org.apache.http.client.utils.URIBuilder; import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; -import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor; +import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils; import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; -import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; +import org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.data.readers.RecordReader; -import org.apache.pinot.spi.data.readers.RecordReaderConfig; -import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; -import org.roaringbitmap.PeekableIntIterator; -import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class); - private static HelixManager _helixManager = MINION_CONTEXT.getHelixManager(); - private static HelixAdmin _clusterManagementTool = _helixManager.getClusterManagmentTool(); - private static String _clusterName = _helixManager.getClusterName(); - - private class CompactedRecordReader implements RecordReader { - private final PinotSegmentRecordReader _pinotSegmentRecordReader; - private final PeekableIntIterator _validDocIdsIterator; - // Reusable generic row to store the next row to return - GenericRow _nextRow = new GenericRow(); - // Flag to mark whether we need to fetch another row - boolean _nextRowReturned = true; - - CompactedRecordReader(File indexDir, ImmutableRoaringBitmap validDocIds) { - _pinotSegmentRecordReader = new PinotSegmentRecordReader(); - _pinotSegmentRecordReader.init(indexDir, null, null); - _validDocIdsIterator = validDocIds.getIntIterator(); - } - - @Override - public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) { - } - - @Override - public boolean hasNext() { - if (!_validDocIdsIterator.hasNext() && _nextRowReturned) { - return false; - } - - // If next row has not been returned, return true - if (!_nextRowReturned) { - return true; - } - - // Try to get the next row to return - if (_validDocIdsIterator.hasNext()) { - int docId = _validDocIdsIterator.next(); - _nextRow.clear(); - _pinotSegmentRecordReader.getRecord(docId, _nextRow); - _nextRowReturned = false; - return true; - } - - // Cannot find next row to return, return false - return false; - } - - @Override - public GenericRow next() { - return next(new GenericRow()); - } - - @Override - public GenericRow next(GenericRow reuse) { - Preconditions.checkState(!_nextRowReturned); - reuse.init(_nextRow); - _nextRowReturned = true; - return reuse; - } - - @Override - public void rewind() { - _pinotSegmentRecordReader.rewind(); - _nextRowReturned = true; - } - - @Override - public void close() - throws IOException { - _pinotSegmentRecordReader.close(); - } - } @Override protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir) @@ -143,7 +53,7 @@ public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExe String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); TableConfig tableConfig = getTableConfig(tableNameWithType); - ImmutableRoaringBitmap validDocIds = getValidDocIds(tableNameWithType, configs); + RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIds(tableNameWithType, segmentName, configs, MINION_CONTEXT); if (validDocIds.isEmpty()) { // prevents empty segment generation @@ -159,7 +69,8 @@ public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExe } SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); - try (CompactedRecordReader compactedRecordReader = new CompactedRecordReader(indexDir, validDocIds)) { + try (CompactedPinotSegmentRecordReader compactedRecordReader = new CompactedPinotSegmentRecordReader(indexDir, + validDocIds)) { SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, tableConfig, segmentMetadata, segmentName); SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); driver.init(config, compactedRecordReader); @@ -198,46 +109,6 @@ public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExe return config; } - // TODO: Consider moving this method to a more appropriate class (eg ServerSegmentMetadataReader) - private static ImmutableRoaringBitmap getValidDocIds(String tableNameWithType, Map<String, String> configs) - throws URISyntaxException { - String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY); - String server = getServer(segmentName, tableNameWithType); - - // get the url for the validDocIds for the server - InstanceConfig instanceConfig = _clusterManagementTool.getInstanceConfig(_clusterName, server); - String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig); - String url = - new URIBuilder(endpoint).setPath(String.format("/segments/%s/%s/validDocIds", tableNameWithType, segmentName)) - .toString(); - - // get the validDocIds from that server - Response response = ClientBuilder.newClient().target(url).request().get(Response.class); - Preconditions.checkState(response.getStatus() == Response.Status.OK.getStatusCode(), - "Unable to retrieve validDocIds from %s", url); - byte[] snapshot = response.readEntity(byte[].class); - ImmutableRoaringBitmap validDocIds = new ImmutableRoaringBitmap(ByteBuffer.wrap(snapshot)); - return validDocIds; - } - - @VisibleForTesting - public static String getServer(String segmentName, String tableNameWithType) { - ExternalView externalView = _clusterManagementTool.getResourceExternalView(_clusterName, tableNameWithType); - if (externalView == null) { - throw new IllegalStateException("External view does not exist for table: " + tableNameWithType); - } - Map<String, String> instanceStateMap = externalView.getStateMap(segmentName); - if (instanceStateMap == null) { - throw new IllegalStateException("Failed to find segment: " + segmentName); - } - for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { - if (entry.getValue().equals(SegmentStateModel.ONLINE)) { - return entry.getKey(); - } - } - throw new IllegalStateException("Failed to find ONLINE server for segment: " + segmentName); - } - @Override protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig, SegmentConversionResult segmentConversionResult) { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java index 590102319e..e6eaf3679e 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java @@ -18,26 +18,20 @@ */ package org.apache.pinot.plugin.minion.tasks.upsertcompaction; -import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BiMap; -import java.io.IOException; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.http.client.utils.URIBuilder; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; -import org.apache.pinot.controller.util.CompletionServiceHelper; +import org.apache.pinot.controller.util.ServerSegmentMetadataReader; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask; import org.apache.pinot.core.minion.PinotTaskConfig; @@ -45,7 +39,6 @@ import org.apache.pinot.spi.annotations.minion.TaskGenerator; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; -import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.TimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,6 +82,7 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); for (TableConfig tableConfig : tableConfigs) { if (!validate(tableConfig)) { + LOGGER.warn("Validation failed for table {}. Skipping..", tableConfig.getTableName()); continue; } @@ -103,6 +97,8 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { continue; } + // TODO: add a check to see if the task is already running for the table + // get server to segment mappings PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager(); Map<String, List<String>> serverToSegments = pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); @@ -113,27 +109,21 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { throw new RuntimeException(e); } - Map<String, SegmentZKMetadata> completedSegmentsMap = - completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity())); + ServerSegmentMetadataReader serverSegmentMetadataReader = + new ServerSegmentMetadataReader(_clusterInfoAccessor.getExecutor(), + _clusterInfoAccessor.getConnectionManager()); - List<String> validDocIdUrls; - try { - validDocIdUrls = getValidDocIdMetadataUrls(serverToSegments, serverToEndpoints, tableNameWithType, - completedSegmentsMap.keySet()); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } + // TODO: currently, we put segmentNames=null to get metadata for all segments. We can change this to get + // valid doc id metadata in batches with the loop. + List<ValidDocIdMetadataInfo> validDocIdMetadataList = + serverSegmentMetadataReader.getValidDocIdMetadataFromServer(tableNameWithType, serverToSegments, + serverToEndpoints, null, 60_000); - // request the urls from the servers - CompletionServiceHelper completionServiceHelper = - new CompletionServiceHelper(_clusterInfoAccessor.getExecutor(), _clusterInfoAccessor.getConnectionManager(), - serverToEndpoints.inverse()); - - CompletionServiceHelper.CompletionServiceResponse serviceResponse = - completionServiceHelper.doMultiGetRequest(validDocIdUrls, tableNameWithType, false, 3000); + Map<String, SegmentZKMetadata> completedSegmentsMap = + completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity())); SegmentSelectionResult segmentSelectionResult = - processValidDocIdMetadata(taskConfigs, completedSegmentsMap, serviceResponse._httpResponses.entrySet()); + processValidDocIdMetadata(taskConfigs, completedSegmentsMap, validDocIdMetadataList); if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) { pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentSelectionResult.getSegmentsForDeletion(), @@ -163,7 +153,7 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { @VisibleForTesting public static SegmentSelectionResult processValidDocIdMetadata(Map<String, String> taskConfigs, - Map<String, SegmentZKMetadata> completedSegmentsMap, Set<Map.Entry<String, String>> responseSet) { + Map<String, SegmentZKMetadata> completedSegmentsMap, List<ValidDocIdMetadataInfo> validDocIdMetadataInfoList) { double invalidRecordsThresholdPercent = Double.parseDouble( taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT, String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT))); @@ -172,62 +162,22 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT))); List<SegmentZKMetadata> segmentsForCompaction = new ArrayList<>(); List<String> segmentsForDeletion = new ArrayList<>(); - for (Map.Entry<String, String> streamResponse : responseSet) { - JsonNode allValidDocIdMetadata; - try { - allValidDocIdMetadata = JsonUtils.stringToJsonNode(streamResponse.getValue()); - } catch (IOException e) { - LOGGER.error("Unable to parse validDocIdMetadata response for: {}", streamResponse.getKey()); - continue; - } - Iterator<JsonNode> iterator = allValidDocIdMetadata.elements(); - while (iterator.hasNext()) { - JsonNode validDocIdMetadata = iterator.next(); - long totalInvalidDocs = validDocIdMetadata.get("totalInvalidDocs").asLong(); - String segmentName = validDocIdMetadata.get("segmentName").asText(); - SegmentZKMetadata segment = completedSegmentsMap.get(segmentName); - long totalDocs = validDocIdMetadata.get("totalDocs").asLong(); - double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100; - if (totalInvalidDocs == totalDocs) { - segmentsForDeletion.add(segment.getSegmentName()); - } else if (invalidRecordPercent > invalidRecordsThresholdPercent - && totalInvalidDocs > invalidRecordsThresholdCount) { - segmentsForCompaction.add(segment); - } + for (ValidDocIdMetadataInfo validDocIdMetadata : validDocIdMetadataInfoList) { + long totalInvalidDocs = validDocIdMetadata.getTotalInvalidDocs(); + String segmentName = validDocIdMetadata.getSegmentName(); + SegmentZKMetadata segment = completedSegmentsMap.get(segmentName); + long totalDocs = validDocIdMetadata.getTotalDocs(); + double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100; + if (totalInvalidDocs == totalDocs) { + segmentsForDeletion.add(segment.getSegmentName()); + } else if (invalidRecordPercent > invalidRecordsThresholdPercent + && totalInvalidDocs > invalidRecordsThresholdCount) { + segmentsForCompaction.add(segment); } } return new SegmentSelectionResult(segmentsForCompaction, segmentsForDeletion); } - @VisibleForTesting - public static List<String> getValidDocIdMetadataUrls(Map<String, List<String>> serverToSegments, - BiMap<String, String> serverToEndpoints, String tableNameWithType, Set<String> completedSegments) - throws URISyntaxException { - Set<String> remainingSegments = new HashSet<>(completedSegments); - List<String> urls = new ArrayList<>(); - for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) { - if (remainingSegments.isEmpty()) { - break; - } - String server = entry.getKey(); - List<String> segmentNames = entry.getValue(); - URIBuilder uriBuilder = new URIBuilder(serverToEndpoints.get(server)).setPath( - String.format("/tables/%s/validDocIdMetadata", tableNameWithType)); - int completedSegmentCountPerServer = 0; - for (String segmentName : segmentNames) { - if (remainingSegments.remove(segmentName)) { - completedSegmentCountPerServer++; - uriBuilder.addParameter("segmentNames", segmentName); - } - } - if (completedSegmentCountPerServer > 0) { - // only add to the list if the server has completed segments - urls.add(uriBuilder.toString()); - } - } - return urls; - } - private List<SegmentZKMetadata> getCompletedSegments(String tableNameWithType, Map<String, String> taskConfigs) { List<SegmentZKMetadata> completedSegments = new ArrayList<>(); String bufferPeriod = taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java index 604c58f6d0..0869880ccf 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java @@ -24,6 +24,7 @@ import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.model.ExternalView; import org.apache.pinot.minion.MinionContext; +import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.mockito.Mockito; import org.testng.Assert; @@ -51,13 +52,15 @@ public class UpsertCompactionTaskExecutorTest { Mockito.when(helixManager.getClusterManagmentTool()).thenReturn(clusterManagementTool); minionContext.setHelixManager(helixManager); - String server = UpsertCompactionTaskExecutor.getServer(SEGMENT_NAME, REALTIME_TABLE_NAME); + String server = MinionTaskUtils.getServer(SEGMENT_NAME, REALTIME_TABLE_NAME, helixManager.getClusterManagmentTool(), + helixManager.getClusterName()); Assert.assertEquals(server, "server1"); // verify exception thrown with OFFLINE server map.put("server1", SegmentStateModel.OFFLINE); Assert.assertThrows(IllegalStateException.class, - () -> UpsertCompactionTaskExecutor.getServer(SEGMENT_NAME, REALTIME_TABLE_NAME)); + () -> MinionTaskUtils.getServer(SEGMENT_NAME, REALTIME_TABLE_NAME, + helixManager.getClusterManagmentTool(), helixManager.getClusterName())); } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java index 03908a958e..7da9a7f1e8 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java @@ -18,20 +18,17 @@ */ package org.apache.pinot.plugin.minion.tasks.upsertcompaction; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import java.net.URISyntaxException; -import java.util.AbstractMap; +import com.fasterxml.jackson.core.type.TypeReference; +import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.helix.model.IdealState; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo; import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask; @@ -41,6 +38,7 @@ import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.BeforeClass; @@ -171,57 +169,6 @@ public class UpsertCompactionTaskGeneratorTest { assertEquals(pinotTaskConfigs.size(), 0); } - @Test - public void testGetValidDocIdMetadataUrls() - throws URISyntaxException { - Map<String, List<String>> serverToSegments = new HashMap<>(); - serverToSegments.put("server1", - Lists.newArrayList(_completedSegment.getSegmentName(), _completedSegment2.getSegmentName())); - serverToSegments.put("server2", Lists.newArrayList("consumingSegment")); - BiMap<String, String> serverToEndpoints = HashBiMap.create(1); - serverToEndpoints.put("server1", "http://endpoint1"); - serverToEndpoints.put("server2", "http://endpoint2"); - Set<String> completedSegments = new HashSet<>(); - completedSegments.add(_completedSegment.getSegmentName()); - completedSegments.add(_completedSegment2.getSegmentName()); - - List<String> validDocIdUrls = - UpsertCompactionTaskGenerator.getValidDocIdMetadataUrls(serverToSegments, serverToEndpoints, - REALTIME_TABLE_NAME, completedSegments); - - String expectedUrl = - String.format("%s/tables/%s/validDocIdMetadata?segmentNames=%s&segmentNames=%s", "http://endpoint1", - REALTIME_TABLE_NAME, _completedSegment.getSegmentName(), _completedSegment2.getSegmentName()); - assertEquals(validDocIdUrls.get(0), expectedUrl); - assertEquals(validDocIdUrls.size(), 1); - } - - @Test - public void testGetValidDocIdMetadataUrlsWithReplicatedSegments() - throws URISyntaxException { - Map<String, List<String>> serverToSegments = new LinkedHashMap<>(); - serverToSegments.put("server1", - Lists.newArrayList(_completedSegment.getSegmentName(), _completedSegment2.getSegmentName())); - serverToSegments.put("server2", - Lists.newArrayList(_completedSegment.getSegmentName(), _completedSegment2.getSegmentName())); - BiMap<String, String> serverToEndpoints = HashBiMap.create(1); - serverToEndpoints.put("server1", "http://endpoint1"); - serverToEndpoints.put("server2", "http://endpoint2"); - Set<String> completedSegments = new HashSet<>(); - completedSegments.add(_completedSegment.getSegmentName()); - completedSegments.add(_completedSegment2.getSegmentName()); - - List<String> validDocIdUrls = - UpsertCompactionTaskGenerator.getValidDocIdMetadataUrls(serverToSegments, serverToEndpoints, - REALTIME_TABLE_NAME, completedSegments); - - String expectedUrl = - String.format("%s/tables/%s/validDocIdMetadata?segmentNames=%s&segmentNames=%s", "http://endpoint1", - REALTIME_TABLE_NAME, _completedSegment.getSegmentName(), _completedSegment2.getSegmentName()); - assertEquals(validDocIdUrls.get(0), expectedUrl); - assertEquals(validDocIdUrls.size(), 1); - } - @Test public void testGetMaxTasks() { Map<String, String> taskConfigs = new HashMap<>(); @@ -234,16 +181,20 @@ public class UpsertCompactionTaskGeneratorTest { } @Test - public void testProcessValidDocIdMetadata() { + public void testProcessValidDocIdMetadata() + throws IOException { Map<String, String> compactionConfigs = getCompactionConfigs("1", "10"); - Set<Map.Entry<String, String>> responseSet = new HashSet<>(); + List<ValidDocIdMetadataInfo> validDocIdMetadataInfoList = new ArrayList<>(); String json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \"" + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + "}," + "{" + "\"totalValidDocs\" : 0," + "\"totalInvalidDocs\" : 10," + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\"," + "\"totalDocs\" : 10" + "}]"; - responseSet.add(new AbstractMap.SimpleEntry<>("", json)); + List<ValidDocIdMetadataInfo> validDocIdMetadataInfo = + JsonUtils.stringToObject(json, new TypeReference<ArrayList<ValidDocIdMetadataInfo>>() { + }); UpsertCompactionTaskGenerator.SegmentSelectionResult segmentSelectionResult = - UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet); + UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, + validDocIdMetadataInfo); assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), _completedSegment.getSegmentName()); assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName()); @@ -251,20 +202,23 @@ public class UpsertCompactionTaskGeneratorTest { // test with a higher invalidRecordsThresholdPercent compactionConfigs = getCompactionConfigs("60", "10"); segmentSelectionResult = - UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet); + UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, + validDocIdMetadataInfo); assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty()); // test without an invalidRecordsThresholdPercent compactionConfigs = getCompactionConfigs("0", "10"); segmentSelectionResult = - UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet); + UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, + validDocIdMetadataInfo); assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), _completedSegment.getSegmentName()); // test without a invalidRecordsThresholdCount compactionConfigs = getCompactionConfigs("30", "0"); segmentSelectionResult = - UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet); + UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, + validDocIdMetadataInfo); assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), _completedSegment.getSegmentName()); } @@ -284,7 +238,7 @@ public class UpsertCompactionTaskGeneratorTest { private IdealState getIdealState(String tableName, List<String> segmentNames) { IdealState idealState = new IdealState(tableName); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - for (String segmentName: segmentNames) { + for (String segmentName : segmentNames) { idealState.setPartitionState(segmentName, "Server_0", "ONLINE"); } return idealState; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java new file mode 100644 index 0000000000..2795982ab6 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.readers; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.roaringbitmap.PeekableIntIterator; +import org.roaringbitmap.RoaringBitmap; + + +/** + * Compacted Pinot Segment Record Reader used for upsert compaction + */ +public class CompactedPinotSegmentRecordReader implements RecordReader { + private final PinotSegmentRecordReader _pinotSegmentRecordReader; + private final PeekableIntIterator _validDocIdsIterator; + // Reusable generic row to store the next row to return + GenericRow _nextRow = new GenericRow(); + // Flag to mark whether we need to fetch another row + boolean _nextRowReturned = true; + + public CompactedPinotSegmentRecordReader(File indexDir, RoaringBitmap validDocIds) { + _pinotSegmentRecordReader = new PinotSegmentRecordReader(); + _pinotSegmentRecordReader.init(indexDir, null, null); + _validDocIdsIterator = validDocIds.getIntIterator(); + } + + @Override + public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) + throws IOException { + } + + @Override + public boolean hasNext() { + if (!_validDocIdsIterator.hasNext() && _nextRowReturned) { + return false; + } + + // If next row has not been returned, return true + if (!_nextRowReturned) { + return true; + } + + // Try to get the next row to return + if (_validDocIdsIterator.hasNext()) { + int docId = _validDocIdsIterator.next(); + _nextRow.clear(); + _pinotSegmentRecordReader.getRecord(docId, _nextRow); + _nextRowReturned = false; + return true; + } + + // Cannot find next row to return, return false + return false; + } + + @Override + public GenericRow next() + throws IOException { + return next(new GenericRow()); + } + + @Override + public GenericRow next(GenericRow reuse) + throws IOException { + Preconditions.checkState(!_nextRowReturned); + reuse.init(_nextRow); + _nextRowReturned = true; + return reuse; + } + + @Override + public void rewind() + throws IOException { + _pinotSegmentRecordReader.rewind(); + _nextRowReturned = true; + } + + @Override + public void close() + throws IOException { + _pinotSegmentRecordReader.close(); + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index b08833b966..29f90715da 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -529,31 +529,64 @@ public class TablesResource { public String getValidDocIdMetadata( @ApiParam(value = "Table name including type", required = true, example = "myTable_REALTIME") @PathParam("tableNameWithType") String tableNameWithType, - @ApiParam(value = "Segment name", allowMultiple = true, required = true) @QueryParam("segmentNames") + @ApiParam(value = "Segment name", allowMultiple = true) @QueryParam("segmentNames") List<String> segmentNames) { + return ResourceUtils.convertToJsonString(processValidDocIdMetadata(tableNameWithType, segmentNames)); + } + + @POST + @Path("/tables/{tableNameWithType}/validDocIdMetadata") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Provides segment validDocId metadata", notes = "Provides segment validDocId metadata") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), + @ApiResponse(code = 500, message = "Internal server error", response = ErrorInfo.class), + @ApiResponse(code = 404, message = "Table or segment not found", response = ErrorInfo.class) + }) + public String getValidDocIdMetadata( + @ApiParam(value = "Table name including type", required = true, example = "myTable_REALTIME") + @PathParam("tableNameWithType") String tableNameWithType, TableSegments tableSegments) { + List<String> segmentNames = tableSegments.getSegments(); + return ResourceUtils.convertToJsonString(processValidDocIdMetadata(tableNameWithType, segmentNames)); + } + + private List<Map<String, Object>> processValidDocIdMetadata(String tableNameWithType, List<String> segments) { TableDataManager tableDataManager = ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType); List<String> missingSegments = new ArrayList<>(); - List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(segmentNames, missingSegments); - if (!missingSegments.isEmpty()) { - throw new WebApplicationException(String.format("Table %s has missing segments", tableNameWithType), - Response.Status.NOT_FOUND); + List<SegmentDataManager> segmentDataManagers; + if (segments == null) { + segmentDataManagers = tableDataManager.acquireAllSegments(); + } else { + segmentDataManagers = tableDataManager.acquireSegments(segments, missingSegments); + if (!missingSegments.isEmpty()) { + throw new WebApplicationException( + String.format("Table %s has missing segments: %s)", tableNameWithType, segments), + Response.Status.NOT_FOUND); + } } List<Map<String, Object>> allValidDocIdMetadata = new ArrayList<>(); for (SegmentDataManager segmentDataManager : segmentDataManagers) { try { IndexSegment indexSegment = segmentDataManager.getSegment(); + if (indexSegment == null) { + LOGGER.warn("Table {} segment {} does not exist", tableNameWithType, segmentDataManager.getSegmentName()); + continue; + } + // Skip the consuming segments if (!(indexSegment instanceof ImmutableSegmentImpl)) { - throw new WebApplicationException( - String.format("Table %s segment %s is not a immutable segment", tableNameWithType, - segmentDataManager.getSegmentName()), Response.Status.BAD_REQUEST); + String msg = String.format("Table %s segment %s is not a immutable segment", tableNameWithType, + segmentDataManager.getSegmentName()); + LOGGER.warn(msg); + continue; } MutableRoaringBitmap validDocIds = indexSegment.getValidDocIds() != null ? indexSegment.getValidDocIds().getMutableRoaringBitmap() : null; if (validDocIds == null) { - throw new WebApplicationException( - String.format("Missing validDocIds for table %s segment %s does not exist", tableNameWithType, - segmentDataManager.getSegmentName()), Response.Status.NOT_FOUND); + String msg = String.format("Missing validDocIds for table %s segment %s does not exist", tableNameWithType, + segmentDataManager.getSegmentName()); + LOGGER.warn(msg); + throw new WebApplicationException(msg, Response.Status.NOT_FOUND); } Map<String, Object> validDocIdMetadata = new HashMap<>(); int totalDocs = indexSegment.getSegmentMetadata().getTotalDocs(); @@ -568,7 +601,7 @@ public class TablesResource { tableDataManager.releaseSegment(segmentDataManager); } } - return ResourceUtils.convertToJsonString(allValidDocIdMetadata); + return allValidDocIdMetadata; } /** diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java index 0469d805d2..79b17396de 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.ws.rs.client.Entity; import javax.ws.rs.core.Response; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.commons.io.FileUtils; @@ -311,6 +312,27 @@ public class TablesResourceTest extends BaseResourceTest { Assert.assertEquals(validDocIdMetadata.get("totalInvalidDocs").asInt(), 99992); } + @Test + public void testValidDocIdMetadataPost() + throws IOException { + IndexSegment segment = _realtimeIndexSegments.get(0); + // Verify the content of the downloaded snapshot from a realtime table. + downLoadAndVerifyValidDocIdsSnapshot(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), + (ImmutableSegmentImpl) segment); + List<String> segments = List.of(segment.getSegmentName()); + TableSegments tableSegments = new TableSegments(segments); + String validDocIdMetadataPath = + "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/validDocIdMetadata"; + String response = + _webTarget.path(validDocIdMetadataPath).queryParam("segmentNames", segment.getSegmentName()).request() + .post(Entity.json(tableSegments), String.class); + JsonNode validDocIdMetadata = JsonUtils.stringToJsonNode(response).get(0); + + Assert.assertEquals(validDocIdMetadata.get("totalDocs").asInt(), 100000); + Assert.assertEquals(validDocIdMetadata.get("totalValidDocs").asInt(), 8); + Assert.assertEquals(validDocIdMetadata.get("totalInvalidDocs").asInt(), 99992); + } + // Verify metadata file from segments. private void downLoadAndVerifySegmentContent(String tableNameWithType, IndexSegment segment) throws IOException, ConfigurationException { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org