This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ae97265d86 #12117 Support for Server & Controller API to check for Segments reload of a table in servers (#13789) ae97265d86 is described below commit ae97265d863ce1dd4c67a85bc8da0120e35fa144 Author: Chaitanya Deepthi <45308220+deepthi...@users.noreply.github.com> AuthorDate: Wed Aug 28 12:07:28 2024 -0700 #12117 Support for Server & Controller API to check for Segments reload of a table in servers (#13789) --- .../ServerSegmentsReloadCheckResponse.java | 53 +++++++++++++ .../TableSegmentsReloadCheckResponse.java | 55 +++++++++++++ .../utils/SegmentsReloadCheckResponseTest.java | 90 ++++++++++++++++++++++ .../api/resources/PinotSegmentRestletResource.java | 43 +++++++++++ .../controller/helix/ControllerRequestClient.java | 11 +++ .../util/ServerSegmentMetadataReader.java | 42 ++++++++++ .../pinot/controller/util/TableMetadataReader.java | 22 ++++++ .../pinot/controller/helix/ControllerTest.java | 5 ++ .../core/data/manager/BaseTableDataManager.java | 26 +++++++ .../tests/BaseClusterIntegrationTestSet.java | 48 +++++++++++- .../local/data/manager/TableDataManager.java | 7 ++ .../immutable/ImmutableSegmentImpl.java | 9 +++ .../pinot/server/api/resources/TablesResource.java | 25 ++++++ .../utils/builder/ControllerRequestURLBuilder.java | 5 ++ 14 files changed, 439 insertions(+), 2 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ServerSegmentsReloadCheckResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ServerSegmentsReloadCheckResponse.java new file mode 100644 index 0000000000..2469bda404 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ServerSegmentsReloadCheckResponse.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.restlet.resources; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + + +/** + * This class gives the data of a server if there exists any segments that need to be reloaded + * + * It has details of server id and returns true/false if there are any segments to be reloaded or not. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class ServerSegmentsReloadCheckResponse { + @JsonProperty("needReload") + private final boolean _needReload; + + @JsonProperty("instanceId") + private final String _instanceId; + + public boolean isNeedReload() { + return _needReload; + } + + public String getInstanceId() { + return _instanceId; + } + + @JsonCreator + public ServerSegmentsReloadCheckResponse(@JsonProperty("needReload") boolean needReload, + @JsonProperty("instanceId") String instanceId) { + _needReload = needReload; + _instanceId = instanceId; + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableSegmentsReloadCheckResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableSegmentsReloadCheckResponse.java new file mode 100644 index 0000000000..bd201870a4 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableSegmentsReloadCheckResponse.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.restlet.resources; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; + + +/** + * This class gives list of the details from each server if there exists any segments that need to be reloaded + * + * It has details of reload flag which returns true if reload is needed on table and additional details of the + * respective servers. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class TableSegmentsReloadCheckResponse { + @JsonProperty("needReload") + boolean _needReload; + @JsonProperty("serverToSegmentsCheckReloadList") + Map<String, ServerSegmentsReloadCheckResponse> _serverToSegmentsCheckReloadList; + + public boolean isNeedReload() { + return _needReload; + } + + public Map<String, ServerSegmentsReloadCheckResponse> getServerToSegmentsCheckReloadList() { + return _serverToSegmentsCheckReloadList; + } + + @JsonCreator + public TableSegmentsReloadCheckResponse(@JsonProperty("needReload") boolean needReload, + @JsonProperty("serverToSegmentsCheckReloadList") + Map<String, ServerSegmentsReloadCheckResponse> serverToSegmentsCheckReloadList) { + _needReload = needReload; + _serverToSegmentsCheckReloadList = serverToSegmentsCheckReloadList; + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentsReloadCheckResponseTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentsReloadCheckResponseTest.java new file mode 100644 index 0000000000..f63a607a12 --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentsReloadCheckResponseTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse; +import org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse; +import org.apache.pinot.spi.utils.JsonUtils; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Tests some of the serializer and deserialization responses from SegmentsReloadCheckResponse class + * needReload will have to be carefully evaluated + */ +public class SegmentsReloadCheckResponseTest { + + @Test + public void testSerialization() + throws IOException { + // Given + boolean needReload = true; + String instanceId = "instance123"; + ServerSegmentsReloadCheckResponse response = new ServerSegmentsReloadCheckResponse(needReload, instanceId); + Map<String, ServerSegmentsReloadCheckResponse> serversResponse = new HashMap<>(); + serversResponse.put(instanceId, response); + TableSegmentsReloadCheckResponse tableResponse = new TableSegmentsReloadCheckResponse(needReload, serversResponse); + String responseString = JsonUtils.objectToPrettyString(response); + String tableResponseString = JsonUtils.objectToPrettyString(tableResponse); + + assertNotNull(responseString); + assertNotNull(tableResponseString); + JsonNode tableResponseJsonNode = JsonUtils.stringToJsonNode(tableResponseString); + assertTrue(tableResponseJsonNode.get("needReload").asBoolean()); + + JsonNode serversList = tableResponseJsonNode.get("serverToSegmentsCheckReloadList"); + JsonNode serverResp = serversList.get("instance123"); + assertEquals(serverResp.get("instanceId").asText(), "instance123"); + assertTrue(serverResp.get("needReload").asBoolean()); + + assertEquals("{\n" + " \"needReload\" : true,\n" + " \"serverToSegmentsCheckReloadList\" : {\n" + + " \"instance123\" : {\n" + " \"needReload\" : true,\n" + " \"instanceId\" : \"instance123\"\n" + + " }\n" + " }\n" + "}", tableResponseString); + assertEquals("{\n" + " \"needReload\" : true,\n" + " \"instanceId\" : \"instance123\"\n" + "}", responseString); + } + + @Test + public void testDeserialization() + throws Exception { + String jsonResponse = "{\n" + " \"needReload\": false,\n" + " \"serverToSegmentsCheckReloadList\": {\n" + + " \"Server_10.0.0.215_7050\": {\n" + " \"needReload\": false,\n" + + " \"instanceId\": \"Server_10.0.0.215_7050\"\n" + " }\n" + " }\n" + "}"; + JsonNode jsonNode = JsonUtils.stringToJsonNode(jsonResponse); + TableSegmentsReloadCheckResponse tableReloadResponse = + JsonUtils.stringToObject(jsonResponse, new TypeReference<TableSegmentsReloadCheckResponse>() { + }); + // Then + assertNotNull(jsonNode); + assertFalse(tableReloadResponse.isNeedReload()); + assertNotNull(tableReloadResponse.getServerToSegmentsCheckReloadList()); + Map<String, ServerSegmentsReloadCheckResponse> serverSegmentReloadResp = + tableReloadResponse.getServerToSegmentsCheckReloadList(); + assertEquals(serverSegmentReloadResp.get("Server_10.0.0.215_7050").isNeedReload(), false); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index c75469d68c..74c09d8da7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -69,6 +69,8 @@ import org.apache.pinot.common.lineage.SegmentLineage; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse; +import org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse; import org.apache.pinot.common.utils.DatabaseUtils; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.URIUtils; @@ -822,6 +824,47 @@ public class PinotSegmentRestletResource { return segmentsMetadata; } + @GET + @Path("segments/{tableNameWithType}/needReload") + @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", action = Actions.Table.GET_METADATA) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Gets the metadata of reload segments check from servers hosting the table", notes = + "Returns true if reload is needed on the table from any one of the servers") + public String getTableReloadMetadata( + @ApiParam(value = "Table name with type", required = true, example = "myTable_REALTIME") + @PathParam("tableNameWithType") String tableNameWithType, + @QueryParam("verbose") @DefaultValue("false") boolean verbose, @Context HttpHeaders headers) { + tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); + LOGGER.info("Received a request to check reload for all servers hosting segments for table {}", tableNameWithType); + try { + TableMetadataReader tableMetadataReader = + new TableMetadataReader(_executor, _connectionManager, _pinotHelixResourceManager); + Map<String, JsonNode> needReloadMetadata = + tableMetadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType, + _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + boolean needReload = + needReloadMetadata.values().stream().anyMatch(value -> value.get("needReload").booleanValue()); + Map<String, ServerSegmentsReloadCheckResponse> serverResponses = new HashMap<>(); + TableSegmentsReloadCheckResponse tableNeedReloadResponse; + if (verbose) { + for (Map.Entry<String, JsonNode> entry : needReloadMetadata.entrySet()) { + serverResponses.put(entry.getKey(), + new ServerSegmentsReloadCheckResponse(entry.getValue().get("needReload").booleanValue(), + entry.getValue().get("instanceId").asText())); + } + tableNeedReloadResponse = new TableSegmentsReloadCheckResponse(needReload, serverResponses); + } else { + tableNeedReloadResponse = new TableSegmentsReloadCheckResponse(needReload, serverResponses); + } + return JsonUtils.objectToPrettyString(tableNeedReloadResponse); + } catch (InvalidConfigException e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Status.BAD_REQUEST); + } catch (IOException ioe) { + throw new ControllerApplicationException(LOGGER, "Error parsing Pinot server response: " + ioe.getMessage(), + Status.INTERNAL_SERVER_ERROR, ioe); + } + } + @GET @Path("segments/{tableName}/zkmetadata") @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_METADATA) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java index 18676d581d..5f8f7d3190 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java @@ -201,6 +201,17 @@ public class ControllerRequestClient { } } + public String checkIfReloadIsNeeded(String tableNameWithType, Boolean verbose) + throws IOException { + try { + SimpleHttpResponse simpleHttpResponse = HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest( + new URI(_controllerRequestURLBuilder.forTableNeedReload(tableNameWithType, verbose)), _headers, null)); + return simpleHttpResponse.getResponse(); + } catch (HttpErrorStatusException | URISyntaxException e) { + throw new IOException(e); + } + } + public void reloadSegment(String tableName, String segmentName, boolean forceReload) throws IOException { try { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java index b3fd851ff4..781140a978 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java @@ -214,6 +214,43 @@ public class ServerSegmentMetadataReader { return segmentsMetadata; } + /** + * This method is called when the API request is to fetch data about segment reload of the table. + * This method makes a MultiGet call to all servers that host their respective segments and gets the results. + * This method will return metadata of all the servers along with need reload flag. + * In future additional details like segments list can also be added + */ + public List<String> getCheckReloadSegmentsFromServer(String tableNameWithType, Set<String> serverInstances, + BiMap<String, String> endpoints, int timeoutMs) { + LOGGER.debug("Checking if reload is needed on segments from servers for table {}.", tableNameWithType); + List<String> serverURLs = new ArrayList<>(); + for (String serverInstance : serverInstances) { + serverURLs.add(generateCheckReloadSegmentsServerURL(tableNameWithType, endpoints.get(serverInstance))); + } + BiMap<String, String> endpointsToServers = endpoints.inverse(); + CompletionServiceHelper completionServiceHelper = + new CompletionServiceHelper(_executor, _connectionManager, endpointsToServers); + CompletionServiceHelper.CompletionServiceResponse serviceResponse = + completionServiceHelper.doMultiGetRequest(serverURLs, tableNameWithType, true, timeoutMs); + List<String> serversNeedReloadResponses = new ArrayList<>(); + + int failedParses = 0; + for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) { + try { + serversNeedReloadResponses.add(streamResponse.getValue()); + } catch (Exception e) { + failedParses++; + LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e); + } + } + if (failedParses != 0) { + LOGGER.error("Unable to parse server {} / {} response due to an error: ", failedParses, serverURLs.size()); + } + + LOGGER.debug("Retrieved metadata of reload check from servers."); + return serversNeedReloadResponses; + } + /** * This method is called when the API request is to fetch validDocId metadata for a list segments of the given table. * This method will pick one server randomly that hosts the target segment and fetch the segment metadata result. @@ -375,6 +412,11 @@ public class ServerSegmentMetadataReader { return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint, tableNameWithType, segmentName, paramsStr); } + private String generateCheckReloadSegmentsServerURL(String tableNameWithType, String endpoint) { + tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8); + return String.format("%s/tables/%s/segments/needReload", endpoint, tableNameWithType); + } + @Deprecated private String generateValidDocIdsURL(String tableNameWithType, String segmentName, String validDocIdsType, String endpoint) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java index adf7e3a7b7..a7a53d421d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java @@ -23,6 +23,7 @@ import com.google.common.collect.BiMap; import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -33,7 +34,9 @@ import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.restlet.resources.TableMetadataInfo; import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; /** @@ -55,6 +58,25 @@ public class TableMetadataReader { _pinotHelixResourceManager = helixResourceManager; } + public Map<String, JsonNode> getServerCheckSegmentsReloadMetadata(String tableNameWithType, int timeoutMs) + throws InvalidConfigException, IOException { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + List<String> serverInstances = _pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType, tableType); + Set<String> serverInstanceSet = new HashSet<>(serverInstances); + BiMap<String, String> endpoints = _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverInstanceSet); + ServerSegmentMetadataReader serverSegmentMetadataReader = + new ServerSegmentMetadataReader(_executor, _connectionManager); + List<String> segmentsMetadata = + serverSegmentMetadataReader.getCheckReloadSegmentsFromServer(tableNameWithType, serverInstanceSet, endpoints, + timeoutMs); + Map<String, JsonNode> response = new HashMap<>(); + for (String segmentMetadata : segmentsMetadata) { + JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata); + response.put(responseJson.get("instanceId").asText(), responseJson); + } + return response; + } + /** * This api takes in list of segments for which we need the metadata. */ diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 466de32d25..98ddbac73a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -721,6 +721,11 @@ public class ControllerTest { return getControllerRequestClient().reloadTable(tableName, TableType.OFFLINE, forceDownload); } + public String checkIfReloadIsNeeded(String tableNameWithType, Boolean verbose) + throws IOException { + return getControllerRequestClient().checkIfReloadIsNeeded(tableNameWithType, verbose); + } + public void reloadOfflineSegment(String tableName, String segmentName, boolean forceDownload) throws IOException { getControllerRequestClient().reloadSegment(tableName, segmentName, forceDownload); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 4f24dc3d52..56d2cb35d6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -61,6 +61,7 @@ import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; import org.apache.pinot.core.util.PeerServerSegmentFinder; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; @@ -1024,6 +1025,31 @@ public abstract class BaseTableDataManager implements TableDataManager { } } + @Override + public boolean needReloadSegments() + throws Exception { + IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig(); + List<SegmentDataManager> segmentDataManagers = acquireAllSegments(); + boolean needReload = false; + try { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + IndexSegment segment = segmentDataManager.getSegment(); + if (segment instanceof ImmutableSegmentImpl) { + ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment; + if (immutableSegment.isReloadNeeded(indexLoadingConfig)) { + needReload = true; + break; + } + } + } + } finally { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + releaseSegment(segmentDataManager); + } + } + return needReload; + } + private SegmentDirectory initSegmentDirectory(String segmentName, String segmentCrc, IndexLoadingConfig indexLoadingConfig) throws Exception { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java index 09310cb243..ca27ed4ef4 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java @@ -732,11 +732,34 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati // Upload the schema with extra columns addSchema(schema); - + String tableNameWithTypeOffline = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName); + String tableNameWithTypeRealtime = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName); // Reload the table if (includeOfflineTable) { + //test controller api which gives responses if reload is needed on any of the server segments when default + // columns are added + String needBeforeReloadResponseWithNoVerbose = checkIfReloadIsNeeded(tableNameWithTypeOffline, false); + String needBeforeReloadResponseWithVerbose = checkIfReloadIsNeeded(tableNameWithTypeOffline, true); + JsonNode jsonNeedReloadResponseWithNoVerbose = JsonUtils.stringToJsonNode(needBeforeReloadResponseWithNoVerbose); + JsonNode jsonNeedReloadResponseWithVerbose = JsonUtils.stringToJsonNode(needBeforeReloadResponseWithVerbose); + //test to check if reload is needed i.e true + assertTrue(jsonNeedReloadResponseWithNoVerbose.get("needReload").asBoolean()); + assertTrue(jsonNeedReloadResponseWithVerbose.get("needReload").asBoolean()); + assertFalse(jsonNeedReloadResponseWithVerbose.get("serverToSegmentsCheckReloadList").isEmpty()); reloadOfflineTable(rawTableName); } + //test controller api which gives responses if reload is needed on any of the server segments when default + // columns are added + String needBeforeReloadResponseRealtimeWithNoVerbose = checkIfReloadIsNeeded(tableNameWithTypeRealtime, false); + String needBeforeReloadResponseRealtimeWithVerbose = checkIfReloadIsNeeded(tableNameWithTypeRealtime, true); + JsonNode jsonNeedReloadResponseRealTimeWithNoVerbose = + JsonUtils.stringToJsonNode(needBeforeReloadResponseRealtimeWithNoVerbose); + JsonNode jsonNeedReloadResponseRealTimeWithVerbose = + JsonUtils.stringToJsonNode(needBeforeReloadResponseRealtimeWithVerbose); + //test to check if reload is needed i.e true + assertTrue(jsonNeedReloadResponseRealTimeWithNoVerbose.get("needReload").asBoolean()); + assertTrue(jsonNeedReloadResponseRealTimeWithVerbose.get("needReload").asBoolean()); + assertFalse(jsonNeedReloadResponseRealTimeWithVerbose.get("serverToSegmentsCheckReloadList").isEmpty()); reloadRealtimeTable(rawTableName); // Wait for all segments to finish reloading, and test querying the new columns @@ -762,7 +785,6 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati JsonNode resultTable = queryResponse.get("resultTable"); assertEquals(resultTable.get("dataSchema").get("columnNames").size(), schema.size()); assertEquals(resultTable.get("rows").size(), 10); - // Test aggregation query to include querying all segemnts (including realtime) String aggregationQuery = "SELECT SUMMV(NewIntMVDimension) FROM " + rawTableName; queryResponse = postQuery(aggregationQuery); @@ -778,6 +800,28 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati queryResponse = postQuery(countStarQuery); assertEquals(queryResponse.get("exceptions").size(), 0); assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asLong(), countStarResult); + if (includeOfflineTable) { + String needAfterReloadResponseWithNoVerbose = checkIfReloadIsNeeded(tableNameWithTypeOffline, false); + String needAfterReloadResponseWithVerbose = checkIfReloadIsNeeded(tableNameWithTypeOffline, true); + JsonNode jsonNeedReloadResponseAfterWithNoVerbose = + JsonUtils.stringToJsonNode(needAfterReloadResponseWithNoVerbose); + JsonNode jsonNeedReloadResponseAfterWithVerbose = JsonUtils.stringToJsonNode(needAfterReloadResponseWithVerbose); + //test to check if reload on offline table is needed i.e false after reload is finished + assertFalse(jsonNeedReloadResponseAfterWithNoVerbose.get("needReload").asBoolean()); + assertFalse(jsonNeedReloadResponseAfterWithVerbose.get("needReload").asBoolean()); + assertFalse(jsonNeedReloadResponseRealTimeWithVerbose.get("serverToSegmentsCheckReloadList").isEmpty()); + } + String needAfterReloadResponseRealtimeWithNoVerbose = checkIfReloadIsNeeded(tableNameWithTypeRealtime, false); + String needAfterReloadResponseRealTimeWithVerbose = checkIfReloadIsNeeded(tableNameWithTypeRealtime, true); + JsonNode jsonNeedReloadResponseRealtimeAfterWithNoVerbose = + JsonUtils.stringToJsonNode(needAfterReloadResponseRealtimeWithNoVerbose); + JsonNode jsonNeedReloadResponseRealtimeAfterWithVerbose = + JsonUtils.stringToJsonNode(needAfterReloadResponseRealTimeWithVerbose); + + //test to check if reload on real time table is needed i.e false after reload is finished + assertFalse(jsonNeedReloadResponseRealtimeAfterWithNoVerbose.get("needReload").asBoolean()); + assertFalse(jsonNeedReloadResponseRealtimeAfterWithVerbose.get("needReload").asBoolean()); + assertFalse(jsonNeedReloadResponseRealtimeAfterWithVerbose.get("serverToSegmentsCheckReloadList").isEmpty()); } private DimensionFieldSpec constructNewDimension(FieldSpec.DataType dataType, boolean singleValue) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index 677d659fff..480b2ba70b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -122,6 +122,13 @@ public interface TableDataManager { */ boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig); + /** + * Check if reload is needed for any of the segments of a table + * @return true if reload is needed for any of the segments and false otherwise + */ + boolean needReloadSegments() + throws Exception; + /** * Downloads a segment and loads it into the table. * NOTE: This method is part of the implementation detail of {@link #addOnlineSegment(String)}. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java index 76129dabb8..14546d7ba6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java @@ -33,6 +33,7 @@ import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexContainer; @@ -173,6 +174,14 @@ public class ImmutableSegmentImpl implements ImmutableSegment { V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME); } + /** + * if re processing or reload is needed on a segment then return true + */ + public boolean isReloadNeeded(IndexLoadingConfig indexLoadingConfig) + throws Exception { + return ImmutableSegmentLoader.needPreprocess(_segmentDirectory, indexLoadingConfig, indexLoadingConfig.getSchema()); + } + @Override public <I extends IndexReader> I getIndex(String column, IndexType<?, I, ?> type) { ColumnIndexContainer container = _indexContainerMap.get(column); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index 228ba2277b..ce85ec3f31 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -64,6 +64,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.response.server.TableIndexMetadataResponse; import org.apache.pinot.common.restlet.resources.ResourceUtils; import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo; +import org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse; import org.apache.pinot.common.restlet.resources.TableMetadataInfo; import org.apache.pinot.common.restlet.resources.TableSegmentValidationInfo; import org.apache.pinot.common.restlet.resources.TableSegments; @@ -954,4 +955,28 @@ public class TablesResource { } return new TableSegmentValidationInfo(true, maxEndTimeMs); } + + @GET + @Path("/tables/{tableName}/segments/needReload") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Checks if reload is needed on any segment", notes = "Returns true if reload is required on" + + " any segment in this server") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success", response = TableSegments.class), @ApiResponse(code = 500, + message = "Internal Server error", response = ErrorInfo.class) + }) + public String checkSegmentsReload( + @ApiParam(value = "Table Name with type", required = true) @PathParam("tableName") String tableName, + @Context HttpHeaders headers) { + tableName = DatabaseUtils.translateTableName(tableName, headers); + TableDataManager tableDataManager = ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName); + boolean needReload = false; + try { + needReload = tableDataManager.needReloadSegments(); + } catch (Exception e) { + throw new WebApplicationException(e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR); + } + return ResourceUtils.convertToJsonString( + new ServerSegmentsReloadCheckResponse(needReload, tableDataManager.getInstanceId())); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index f4133fee59..19bed50b68 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -239,6 +239,11 @@ public class ControllerRequestURLBuilder { return StringUtil.join("/", _baseUrl, "segments", tableName, query); } + public String forTableNeedReload(String tableNameWithType, boolean verbose) { + String query = String.format("needReload?verbose=%s", verbose); + return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, query); + } + public String forTableRebalanceStatus(String jobId) { return StringUtil.join("/", _baseUrl, "rebalanceStatus", jobId); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org