This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ae97265d86 #12117 Support for Server & Controller API to check for 
Segments reload of a table in servers (#13789)
ae97265d86 is described below

commit ae97265d863ce1dd4c67a85bc8da0120e35fa144
Author: Chaitanya Deepthi <45308220+deepthi...@users.noreply.github.com>
AuthorDate: Wed Aug 28 12:07:28 2024 -0700

    #12117 Support for Server & Controller API to check for Segments reload of 
a table in servers (#13789)
---
 .../ServerSegmentsReloadCheckResponse.java         | 53 +++++++++++++
 .../TableSegmentsReloadCheckResponse.java          | 55 +++++++++++++
 .../utils/SegmentsReloadCheckResponseTest.java     | 90 ++++++++++++++++++++++
 .../api/resources/PinotSegmentRestletResource.java | 43 +++++++++++
 .../controller/helix/ControllerRequestClient.java  | 11 +++
 .../util/ServerSegmentMetadataReader.java          | 42 ++++++++++
 .../pinot/controller/util/TableMetadataReader.java | 22 ++++++
 .../pinot/controller/helix/ControllerTest.java     |  5 ++
 .../core/data/manager/BaseTableDataManager.java    | 26 +++++++
 .../tests/BaseClusterIntegrationTestSet.java       | 48 +++++++++++-
 .../local/data/manager/TableDataManager.java       |  7 ++
 .../immutable/ImmutableSegmentImpl.java            |  9 +++
 .../pinot/server/api/resources/TablesResource.java | 25 ++++++
 .../utils/builder/ControllerRequestURLBuilder.java |  5 ++
 14 files changed, 439 insertions(+), 2 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ServerSegmentsReloadCheckResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ServerSegmentsReloadCheckResponse.java
new file mode 100644
index 0000000000..2469bda404
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ServerSegmentsReloadCheckResponse.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * This class gives the data of a server if there exists any segments that 
need to be reloaded
+ *
+ * It has details of server id and returns true/false if there are any 
segments to be reloaded or not.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ServerSegmentsReloadCheckResponse {
+  @JsonProperty("needReload")
+  private final boolean _needReload;
+
+  @JsonProperty("instanceId")
+  private final String _instanceId;
+
+  public boolean isNeedReload() {
+    return _needReload;
+  }
+
+  public String getInstanceId() {
+    return _instanceId;
+  }
+
+  @JsonCreator
+  public ServerSegmentsReloadCheckResponse(@JsonProperty("needReload") boolean 
needReload,
+      @JsonProperty("instanceId") String instanceId) {
+    _needReload = needReload;
+    _instanceId = instanceId;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableSegmentsReloadCheckResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableSegmentsReloadCheckResponse.java
new file mode 100644
index 0000000000..bd201870a4
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableSegmentsReloadCheckResponse.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+
+
+/**
+ * This class gives list of the details from each server if there exists any 
segments that need to be reloaded
+ *
+ * It has details of reload flag which returns true if reload is needed on 
table and additional details of the
+ * respective servers.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TableSegmentsReloadCheckResponse {
+  @JsonProperty("needReload")
+  boolean _needReload;
+  @JsonProperty("serverToSegmentsCheckReloadList")
+  Map<String, ServerSegmentsReloadCheckResponse> 
_serverToSegmentsCheckReloadList;
+
+  public boolean isNeedReload() {
+    return _needReload;
+  }
+
+  public Map<String, ServerSegmentsReloadCheckResponse> 
getServerToSegmentsCheckReloadList() {
+    return _serverToSegmentsCheckReloadList;
+  }
+
+  @JsonCreator
+  public TableSegmentsReloadCheckResponse(@JsonProperty("needReload") boolean 
needReload,
+      @JsonProperty("serverToSegmentsCheckReloadList")
+      Map<String, ServerSegmentsReloadCheckResponse> 
serverToSegmentsCheckReloadList) {
+    _needReload = needReload;
+    _serverToSegmentsCheckReloadList = serverToSegmentsCheckReloadList;
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentsReloadCheckResponseTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentsReloadCheckResponseTest.java
new file mode 100644
index 0000000000..f63a607a12
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentsReloadCheckResponseTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import 
org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse;
+import 
org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Tests some of the serializer and deserialization responses from 
SegmentsReloadCheckResponse class
+ * needReload will have to be carefully evaluated
+ */
+public class SegmentsReloadCheckResponseTest {
+
+  @Test
+  public void testSerialization()
+      throws IOException {
+    // Given
+    boolean needReload = true;
+    String instanceId = "instance123";
+    ServerSegmentsReloadCheckResponse response = new 
ServerSegmentsReloadCheckResponse(needReload, instanceId);
+    Map<String, ServerSegmentsReloadCheckResponse> serversResponse = new 
HashMap<>();
+    serversResponse.put(instanceId, response);
+    TableSegmentsReloadCheckResponse tableResponse = new 
TableSegmentsReloadCheckResponse(needReload, serversResponse);
+    String responseString = JsonUtils.objectToPrettyString(response);
+    String tableResponseString = JsonUtils.objectToPrettyString(tableResponse);
+
+    assertNotNull(responseString);
+    assertNotNull(tableResponseString);
+    JsonNode tableResponseJsonNode = 
JsonUtils.stringToJsonNode(tableResponseString);
+    assertTrue(tableResponseJsonNode.get("needReload").asBoolean());
+
+    JsonNode serversList = 
tableResponseJsonNode.get("serverToSegmentsCheckReloadList");
+    JsonNode serverResp = serversList.get("instance123");
+    assertEquals(serverResp.get("instanceId").asText(), "instance123");
+    assertTrue(serverResp.get("needReload").asBoolean());
+
+    assertEquals("{\n" + "  \"needReload\" : true,\n" + "  
\"serverToSegmentsCheckReloadList\" : {\n"
+        + "    \"instance123\" : {\n" + "      \"needReload\" : true,\n" + "   
   \"instanceId\" : \"instance123\"\n"
+        + "    }\n" + "  }\n" + "}", tableResponseString);
+    assertEquals("{\n" + "  \"needReload\" : true,\n" + "  \"instanceId\" : 
\"instance123\"\n" + "}", responseString);
+  }
+
+  @Test
+  public void testDeserialization()
+      throws Exception {
+    String jsonResponse = "{\n" + "  \"needReload\": false,\n" + "  
\"serverToSegmentsCheckReloadList\": {\n"
+        + "    \"Server_10.0.0.215_7050\": {\n" + "      \"needReload\": 
false,\n"
+        + "      \"instanceId\": \"Server_10.0.0.215_7050\"\n" + "    }\n" + " 
 }\n" + "}";
+    JsonNode jsonNode = JsonUtils.stringToJsonNode(jsonResponse);
+    TableSegmentsReloadCheckResponse tableReloadResponse =
+        JsonUtils.stringToObject(jsonResponse, new 
TypeReference<TableSegmentsReloadCheckResponse>() {
+        });
+    // Then
+    assertNotNull(jsonNode);
+    assertFalse(tableReloadResponse.isNeedReload());
+    assertNotNull(tableReloadResponse.getServerToSegmentsCheckReloadList());
+    Map<String, ServerSegmentsReloadCheckResponse> serverSegmentReloadResp =
+        tableReloadResponse.getServerToSegmentsCheckReloadList();
+    
assertEquals(serverSegmentReloadResp.get("Server_10.0.0.215_7050").isNeedReload(),
 false);
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index c75469d68c..74c09d8da7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -69,6 +69,8 @@ import org.apache.pinot.common.lineage.SegmentLineage;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import 
org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse;
+import 
org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.URIUtils;
@@ -822,6 +824,47 @@ public class PinotSegmentRestletResource {
     return segmentsMetadata;
   }
 
+  @GET
+  @Path("segments/{tableNameWithType}/needReload")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", 
action = Actions.Table.GET_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Gets the metadata of reload segments check from 
servers hosting the table", notes =
+      "Returns true if reload is needed on the table from any one of the 
servers")
+  public String getTableReloadMetadata(
+      @ApiParam(value = "Table name with type", required = true, example = 
"myTable_REALTIME")
+      @PathParam("tableNameWithType") String tableNameWithType,
+      @QueryParam("verbose") @DefaultValue("false") boolean verbose, @Context 
HttpHeaders headers) {
+    tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
+    LOGGER.info("Received a request to check reload for all servers hosting 
segments for table {}", tableNameWithType);
+    try {
+      TableMetadataReader tableMetadataReader =
+          new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
+      Map<String, JsonNode> needReloadMetadata =
+          
tableMetadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType,
+              _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      boolean needReload =
+          needReloadMetadata.values().stream().anyMatch(value -> 
value.get("needReload").booleanValue());
+      Map<String, ServerSegmentsReloadCheckResponse> serverResponses = new 
HashMap<>();
+      TableSegmentsReloadCheckResponse tableNeedReloadResponse;
+      if (verbose) {
+        for (Map.Entry<String, JsonNode> entry : 
needReloadMetadata.entrySet()) {
+          serverResponses.put(entry.getKey(),
+              new 
ServerSegmentsReloadCheckResponse(entry.getValue().get("needReload").booleanValue(),
+                  entry.getValue().get("instanceId").asText()));
+        }
+        tableNeedReloadResponse = new 
TableSegmentsReloadCheckResponse(needReload, serverResponses);
+      } else {
+        tableNeedReloadResponse = new 
TableSegmentsReloadCheckResponse(needReload, serverResponses);
+      }
+      return JsonUtils.objectToPrettyString(tableNeedReloadResponse);
+    } catch (InvalidConfigException e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Status.BAD_REQUEST);
+    } catch (IOException ioe) {
+      throw new ControllerApplicationException(LOGGER, "Error parsing Pinot 
server response: " + ioe.getMessage(),
+          Status.INTERNAL_SERVER_ERROR, ioe);
+    }
+  }
+
   @GET
   @Path("segments/{tableName}/zkmetadata")
   @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_METADATA)
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 18676d581d..5f8f7d3190 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -201,6 +201,17 @@ public class ControllerRequestClient {
     }
   }
 
+  public String checkIfReloadIsNeeded(String tableNameWithType, Boolean 
verbose)
+      throws IOException {
+    try {
+      SimpleHttpResponse simpleHttpResponse = 
HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(
+          new 
URI(_controllerRequestURLBuilder.forTableNeedReload(tableNameWithType, 
verbose)), _headers, null));
+      return simpleHttpResponse.getResponse();
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
   public void reloadSegment(String tableName, String segmentName, boolean 
forceReload)
       throws IOException {
     try {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index b3fd851ff4..781140a978 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -214,6 +214,43 @@ public class ServerSegmentMetadataReader {
     return segmentsMetadata;
   }
 
+  /**
+   * This method is called when the API request is to fetch data about segment 
reload of the table.
+   * This method makes a MultiGet call to all servers that host their 
respective segments and gets the results.
+   * This method will return metadata of all the servers along with need 
reload flag.
+   * In future additional details like segments list can also be added
+   */
+  public List<String> getCheckReloadSegmentsFromServer(String 
tableNameWithType, Set<String> serverInstances,
+      BiMap<String, String> endpoints, int timeoutMs) {
+    LOGGER.debug("Checking if reload is needed on segments from servers for 
table {}.", tableNameWithType);
+    List<String> serverURLs = new ArrayList<>();
+    for (String serverInstance : serverInstances) {
+      serverURLs.add(generateCheckReloadSegmentsServerURL(tableNameWithType, 
endpoints.get(serverInstance)));
+    }
+    BiMap<String, String> endpointsToServers = endpoints.inverse();
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
endpointsToServers);
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverURLs, 
tableNameWithType, true, timeoutMs);
+    List<String> serversNeedReloadResponses = new ArrayList<>();
+
+    int failedParses = 0;
+    for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
+      try {
+        serversNeedReloadResponses.add(streamResponse.getValue());
+      } catch (Exception e) {
+        failedParses++;
+        LOGGER.error("Unable to parse server {} response due to an error: ", 
streamResponse.getKey(), e);
+      }
+    }
+    if (failedParses != 0) {
+      LOGGER.error("Unable to parse server {} / {} response due to an error: 
", failedParses, serverURLs.size());
+    }
+
+    LOGGER.debug("Retrieved metadata of reload check from servers.");
+    return serversNeedReloadResponses;
+  }
+
   /**
    * This method is called when the API request is to fetch validDocId 
metadata for a list segments of the given table.
    * This method will pick one server randomly that hosts the target segment 
and fetch the segment metadata result.
@@ -375,6 +412,11 @@ public class ServerSegmentMetadataReader {
     return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint, 
tableNameWithType, segmentName, paramsStr);
   }
 
+  private String generateCheckReloadSegmentsServerURL(String 
tableNameWithType, String endpoint) {
+    tableNameWithType = URLEncoder.encode(tableNameWithType, 
StandardCharsets.UTF_8);
+    return String.format("%s/tables/%s/segments/needReload", endpoint, 
tableNameWithType);
+  }
+
   @Deprecated
   private String generateValidDocIdsURL(String tableNameWithType, String 
segmentName, String validDocIdsType,
       String endpoint) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index adf7e3a7b7..a7a53d421d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -23,6 +23,7 @@ import com.google.common.collect.BiMap;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -33,7 +34,9 @@ import 
org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
 /**
@@ -55,6 +58,25 @@ public class TableMetadataReader {
     _pinotHelixResourceManager = helixResourceManager;
   }
 
+  public Map<String, JsonNode> getServerCheckSegmentsReloadMetadata(String 
tableNameWithType, int timeoutMs)
+      throws InvalidConfigException, IOException {
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    List<String> serverInstances = 
_pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType, 
tableType);
+    Set<String> serverInstanceSet = new HashSet<>(serverInstances);
+    BiMap<String, String> endpoints = 
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverInstanceSet);
+    ServerSegmentMetadataReader serverSegmentMetadataReader =
+        new ServerSegmentMetadataReader(_executor, _connectionManager);
+    List<String> segmentsMetadata =
+        
serverSegmentMetadataReader.getCheckReloadSegmentsFromServer(tableNameWithType, 
serverInstanceSet, endpoints,
+            timeoutMs);
+    Map<String, JsonNode> response = new HashMap<>();
+    for (String segmentMetadata : segmentsMetadata) {
+      JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
+      response.put(responseJson.get("instanceId").asText(), responseJson);
+    }
+    return response;
+  }
+
   /**
    * This api takes in list of segments for which we need the metadata.
    */
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 466de32d25..98ddbac73a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -721,6 +721,11 @@ public class ControllerTest {
     return getControllerRequestClient().reloadTable(tableName, 
TableType.OFFLINE, forceDownload);
   }
 
+  public String checkIfReloadIsNeeded(String tableNameWithType, Boolean 
verbose)
+      throws IOException {
+    return 
getControllerRequestClient().checkIfReloadIsNeeded(tableNameWithType, verbose);
+  }
+
   public void reloadOfflineSegment(String tableName, String segmentName, 
boolean forceDownload)
       throws IOException {
     getControllerRequestClient().reloadSegment(tableName, segmentName, 
forceDownload);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 4f24dc3d52..56d2cb35d6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -61,6 +61,7 @@ import 
org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
@@ -1024,6 +1025,31 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     }
   }
 
+  @Override
+  public boolean needReloadSegments()
+      throws Exception {
+    IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
+    List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
+    boolean needReload = false;
+    try {
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+        IndexSegment segment = segmentDataManager.getSegment();
+        if (segment instanceof ImmutableSegmentImpl) {
+          ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) 
segment;
+          if (immutableSegment.isReloadNeeded(indexLoadingConfig)) {
+            needReload = true;
+            break;
+          }
+        }
+      }
+    } finally {
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+        releaseSegment(segmentDataManager);
+      }
+    }
+    return needReload;
+  }
+
   private SegmentDirectory initSegmentDirectory(String segmentName, String 
segmentCrc,
       IndexLoadingConfig indexLoadingConfig)
       throws Exception {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 09310cb243..ca27ed4ef4 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -732,11 +732,34 @@ public abstract class BaseClusterIntegrationTestSet 
extends BaseClusterIntegrati
 
     // Upload the schema with extra columns
     addSchema(schema);
-
+    String tableNameWithTypeOffline = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
+    String tableNameWithTypeRealtime = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName);
     // Reload the table
     if (includeOfflineTable) {
+      //test controller api which gives responses if reload is needed on any 
of the server segments when default
+      // columns are added
+      String needBeforeReloadResponseWithNoVerbose = 
checkIfReloadIsNeeded(tableNameWithTypeOffline, false);
+      String needBeforeReloadResponseWithVerbose = 
checkIfReloadIsNeeded(tableNameWithTypeOffline, true);
+      JsonNode jsonNeedReloadResponseWithNoVerbose = 
JsonUtils.stringToJsonNode(needBeforeReloadResponseWithNoVerbose);
+      JsonNode jsonNeedReloadResponseWithVerbose = 
JsonUtils.stringToJsonNode(needBeforeReloadResponseWithVerbose);
+      //test to check if reload is needed i.e true
+      
assertTrue(jsonNeedReloadResponseWithNoVerbose.get("needReload").asBoolean());
+      
assertTrue(jsonNeedReloadResponseWithVerbose.get("needReload").asBoolean());
+      
assertFalse(jsonNeedReloadResponseWithVerbose.get("serverToSegmentsCheckReloadList").isEmpty());
       reloadOfflineTable(rawTableName);
     }
+    //test controller api which gives responses if reload is needed on any of 
the server segments when default
+    // columns are added
+    String needBeforeReloadResponseRealtimeWithNoVerbose = 
checkIfReloadIsNeeded(tableNameWithTypeRealtime, false);
+    String needBeforeReloadResponseRealtimeWithVerbose = 
checkIfReloadIsNeeded(tableNameWithTypeRealtime, true);
+    JsonNode jsonNeedReloadResponseRealTimeWithNoVerbose =
+        
JsonUtils.stringToJsonNode(needBeforeReloadResponseRealtimeWithNoVerbose);
+    JsonNode jsonNeedReloadResponseRealTimeWithVerbose =
+        
JsonUtils.stringToJsonNode(needBeforeReloadResponseRealtimeWithVerbose);
+    //test to check if reload is needed i.e true
+    
assertTrue(jsonNeedReloadResponseRealTimeWithNoVerbose.get("needReload").asBoolean());
+    
assertTrue(jsonNeedReloadResponseRealTimeWithVerbose.get("needReload").asBoolean());
+    
assertFalse(jsonNeedReloadResponseRealTimeWithVerbose.get("serverToSegmentsCheckReloadList").isEmpty());
     reloadRealtimeTable(rawTableName);
 
     // Wait for all segments to finish reloading, and test querying the new 
columns
@@ -762,7 +785,6 @@ public abstract class BaseClusterIntegrationTestSet extends 
BaseClusterIntegrati
     JsonNode resultTable = queryResponse.get("resultTable");
     assertEquals(resultTable.get("dataSchema").get("columnNames").size(), 
schema.size());
     assertEquals(resultTable.get("rows").size(), 10);
-
     // Test aggregation query to include querying all segemnts (including 
realtime)
     String aggregationQuery = "SELECT SUMMV(NewIntMVDimension) FROM " + 
rawTableName;
     queryResponse = postQuery(aggregationQuery);
@@ -778,6 +800,28 @@ public abstract class BaseClusterIntegrationTestSet 
extends BaseClusterIntegrati
     queryResponse = postQuery(countStarQuery);
     assertEquals(queryResponse.get("exceptions").size(), 0);
     
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asLong(),
 countStarResult);
+    if (includeOfflineTable) {
+      String needAfterReloadResponseWithNoVerbose = 
checkIfReloadIsNeeded(tableNameWithTypeOffline, false);
+      String needAfterReloadResponseWithVerbose = 
checkIfReloadIsNeeded(tableNameWithTypeOffline, true);
+      JsonNode jsonNeedReloadResponseAfterWithNoVerbose =
+          JsonUtils.stringToJsonNode(needAfterReloadResponseWithNoVerbose);
+      JsonNode jsonNeedReloadResponseAfterWithVerbose = 
JsonUtils.stringToJsonNode(needAfterReloadResponseWithVerbose);
+      //test to check if reload on offline table is needed i.e false after 
reload is finished
+      
assertFalse(jsonNeedReloadResponseAfterWithNoVerbose.get("needReload").asBoolean());
+      
assertFalse(jsonNeedReloadResponseAfterWithVerbose.get("needReload").asBoolean());
+      
assertFalse(jsonNeedReloadResponseRealTimeWithVerbose.get("serverToSegmentsCheckReloadList").isEmpty());
+    }
+    String needAfterReloadResponseRealtimeWithNoVerbose = 
checkIfReloadIsNeeded(tableNameWithTypeRealtime, false);
+    String needAfterReloadResponseRealTimeWithVerbose = 
checkIfReloadIsNeeded(tableNameWithTypeRealtime, true);
+    JsonNode jsonNeedReloadResponseRealtimeAfterWithNoVerbose =
+        
JsonUtils.stringToJsonNode(needAfterReloadResponseRealtimeWithNoVerbose);
+    JsonNode jsonNeedReloadResponseRealtimeAfterWithVerbose =
+        JsonUtils.stringToJsonNode(needAfterReloadResponseRealTimeWithVerbose);
+
+    //test to check if reload on real time table is needed i.e false after 
reload is finished
+    
assertFalse(jsonNeedReloadResponseRealtimeAfterWithNoVerbose.get("needReload").asBoolean());
+    
assertFalse(jsonNeedReloadResponseRealtimeAfterWithVerbose.get("needReload").asBoolean());
+    
assertFalse(jsonNeedReloadResponseRealtimeAfterWithVerbose.get("serverToSegmentsCheckReloadList").isEmpty());
   }
 
   private DimensionFieldSpec constructNewDimension(FieldSpec.DataType 
dataType, boolean singleValue) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 677d659fff..480b2ba70b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -122,6 +122,13 @@ public interface TableDataManager {
    */
   boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, 
IndexLoadingConfig indexLoadingConfig);
 
+  /**
+   * Check if reload is needed for any of the segments of a table
+   * @return true if reload is needed for any of the segments and false 
otherwise
+   */
+  boolean needReloadSegments()
+      throws Exception;
+
   /**
    * Downloads a segment and loads it into the table.
    * NOTE: This method is part of the implementation detail of {@link 
#addOnlineSegment(String)}.
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 76129dabb8..14546d7ba6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -33,6 +33,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
 import 
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexContainer;
@@ -173,6 +174,14 @@ public class ImmutableSegmentImpl implements 
ImmutableSegment {
         V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
   }
 
+  /**
+   * if re processing or reload is needed on a segment then return true
+   */
+  public boolean isReloadNeeded(IndexLoadingConfig indexLoadingConfig)
+      throws Exception {
+    return ImmutableSegmentLoader.needPreprocess(_segmentDirectory, 
indexLoadingConfig, indexLoadingConfig.getSchema());
+  }
+
   @Override
   public <I extends IndexReader> I getIndex(String column, IndexType<?, I, ?> 
type) {
     ColumnIndexContainer container = _indexContainerMap.get(column);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 228ba2277b..ce85ec3f31 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -64,6 +64,7 @@ import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.response.server.TableIndexMetadataResponse;
 import org.apache.pinot.common.restlet.resources.ResourceUtils;
 import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
+import 
org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse;
 import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
 import org.apache.pinot.common.restlet.resources.TableSegmentValidationInfo;
 import org.apache.pinot.common.restlet.resources.TableSegments;
@@ -954,4 +955,28 @@ public class TablesResource {
     }
     return new TableSegmentValidationInfo(true, maxEndTimeMs);
   }
+
+  @GET
+  @Path("/tables/{tableName}/segments/needReload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Checks if reload is needed on any segment", notes = 
"Returns true if reload is required on"
+      + " any segment in this server")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success", response = 
TableSegments.class), @ApiResponse(code = 500,
+      message = "Internal Server error", response = ErrorInfo.class)
+  })
+  public String checkSegmentsReload(
+      @ApiParam(value = "Table Name with type", required = true) 
@PathParam("tableName") String tableName,
+      @Context HttpHeaders headers) {
+    tableName = DatabaseUtils.translateTableName(tableName, headers);
+    TableDataManager tableDataManager = 
ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName);
+    boolean needReload = false;
+    try {
+      needReload = tableDataManager.needReloadSegments();
+    } catch (Exception e) {
+      throw new WebApplicationException(e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    return ResourceUtils.convertToJsonString(
+        new ServerSegmentsReloadCheckResponse(needReload, 
tableDataManager.getInstanceId()));
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index f4133fee59..19bed50b68 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -239,6 +239,11 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", _baseUrl, "segments", tableName, query);
   }
 
+  public String forTableNeedReload(String tableNameWithType, boolean verbose) {
+    String query = String.format("needReload?verbose=%s", verbose);
+    return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, 
query);
+  }
+
   public String forTableRebalanceStatus(String jobId) {
     return StringUtil.join("/", _baseUrl, "rebalanceStatus", jobId);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to