npawar commented on a change in pull request #5718:
URL: https://github.com/apache/incubator-pinot/pull/5718#discussion_r460472217



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -485,4 +493,91 @@ private void deleteSegmentsInternal(String 
tableNameWithType, List<String> segme
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.FORBIDDEN);
     }
   }
+
+  @GET
+  @Path("segments/{tableName}/reload-status")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Status of segment reload", notes = "Status of segment 
reload")
+  public Map<String, TableMetadataReader.TableReloadStatus> getReloadStatus(
+          @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+          @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr) {
+    List<String> tableNamesWithType = getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));
+    Map<String, TableMetadataReader.TableReloadStatus> reloadStatusMap = new 
HashMap<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      TableMetadataReader.TableReloadStatus tableReloadStatus = null;
+      try {
+        tableReloadStatus = getSegmentsReloadStatus(tableNameWithType);
+      } catch (InvalidConfigException e) {
+        throw new ControllerApplicationException(LOGGER,
+                "Failed to load segment reload status for table: " + 
tableName, Status.NOT_FOUND);
+      }
+      if (Objects.isNull(tableReloadStatus))
+        throw new ControllerApplicationException(LOGGER,
+                "Table: " + tableName + " not found.", Status.NOT_FOUND);
+      reloadStatusMap.put(tableNameWithType, tableReloadStatus);
+    }
+    return reloadStatusMap;
+  }
+
+  private TableMetadataReader.TableReloadStatus getSegmentsReloadStatus(String 
tableNameWithType)
+          throws InvalidConfigException {
+    final Map<String, List<String>> serversToSegmentsMap =
+            
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    TableMetadataReader tableMetadataReader =
+            new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
+    return tableMetadataReader.getReloadStatus(tableNameWithType, 
serversToSegmentsMap,
+            _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+  }
+
+  @GET
+  @Path("segments/{tableName}/metadata")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get the metadata for a segment", notes = "Get the 
metadata for a segment")

Review comment:
       Let's specify that this metadata includes metadata from server. Was this 
the api name that we finally settled on? It's too similar to the other 
"metadata" endpoint and bound to cause confusion 

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerSegmentMetadataReader.java
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.URI;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.pinot.common.http.MultiGetRequest;
+import org.apache.pinot.common.restlet.resources.SegmentStatus;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServerSegmentMetadataReader {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerSegmentMetadataReader.class);
+
+  private final Executor _executor;
+  private final HttpConnectionManager _connectionManager;
+
+  public ServerSegmentMetadataReader(Executor executor, HttpConnectionManager 
connectionManager) {
+    _executor = executor;
+    _connectionManager = connectionManager;
+  }
+
+  public List<String> getSegmentMetadataFromServer(String tableNameWithType,
+                                                   Map<String, List<String>> 
serversToSegmentsMap,
+                                                   BiMap<String, String> 
endpoints, int timeoutMs) {
+    LOGGER.info("Reading segment metadata from servers for table {}.", 
tableNameWithType);
+    List<String> serverURLs = new ArrayList<>();
+    for (Map.Entry<String, List<String>> serverToSegments : 
serversToSegmentsMap.entrySet()) {
+      List<String> segments = serverToSegments.getValue();
+      for (String segment : segments) {
+        serverURLs.add(generateSegmentMetadataServerURL(tableNameWithType, 
segment, endpoints.get(serverToSegments.getKey())));
+      }
+    }
+    CompletionService<GetMethod> completionService =
+            new MultiGetRequest(_executor, 
_connectionManager).execute(serverURLs, timeoutMs);
+    List<String> segmentsMetadata = new ArrayList<>();
+
+    BiMap<String, String> endpointsToServers = endpoints.inverse();
+    for (int i = 0; i < serverURLs.size(); i++) {
+      GetMethod getMethod = null;
+      try {
+        getMethod = completionService.take().get();
+        URI uri = getMethod.getURI();
+        String instance = endpointsToServers.get(uri.getHost() + ":" + 
uri.getPort());
+        if (getMethod.getStatusCode() >= 300) {
+          LOGGER.error("Server: {} returned error: {}", instance, 
getMethod.getStatusCode());
+          continue;
+        }
+        JsonNode segmentMetadata =
+                
JsonUtils.inputStreamToJsonNode(getMethod.getResponseBodyAsStream());
+        segmentsMetadata.add(JsonUtils.objectToString(segmentMetadata));
+        LOGGER.info("Updated segment metadata: {}", segmentMetadata.size());
+      } catch (Exception e) {
+        // Ignore individual exceptions because the exception has been logged 
in MultiGetRequest
+        // Log the number of failed servers after gathering all responses
+      } finally {
+        if (Objects.nonNull(getMethod)) {
+          getMethod.releaseConnection();
+        }
+      }
+    }
+    return segmentsMetadata;
+  }
+
+  private String generateSegmentMetadataServerURL(String tableNameWithType, 
String segmentName, String endpoint) {
+    return "http://"; + endpoint + "/tables/" + tableNameWithType + 
"/segments/" + segmentName + "/metadata";
+  }
+
+  private String generateReloadStatusServerURL(String tableNameWithType, 
String segmentName, String endpoint) {
+    return "http://"; + endpoint + "/tables/" + tableNameWithType + 
"/segments/" + segmentName + "/reload-status";
+  }
+
+  public List<SegmentStatus> getSegmentReloadTime(String tableNameWithType,
+                                                  Map<String, List<String>> 
serversToSegmentsMap,
+                                                  BiMap<String, String> 
endpoints, int timeoutMs) {
+    LOGGER.info("Reading segment reload status from servers for table {}.", 
tableNameWithType);
+    List<String> serverURLs = new ArrayList<>();
+    for (Map.Entry<String, List<String>> serverToSegments : 
serversToSegmentsMap.entrySet()) {
+      List<String> segments = serverToSegments.getValue();
+      for (String segment : segments) {
+        serverURLs.add(generateReloadStatusServerURL(tableNameWithType, 
segment, endpoints.get(serverToSegments.getKey())));
+      }
+    }
+    CompletionService<GetMethod> completionService =
+            new MultiGetRequest(_executor, 
_connectionManager).execute(serverURLs, timeoutMs);
+    BiMap<String, String> endpointsToServers = endpoints.inverse();
+    List<SegmentStatus> segmentsStatus = new ArrayList<>();
+
+    for (int i = 0; i < serverURLs.size(); i++) {
+      GetMethod getMethod = null;
+      try {
+        getMethod = completionService.take().get();
+        URI uri = getMethod.getURI();
+        String instance = endpointsToServers.get(uri.getHost() + ":" + 
uri.getPort());
+        if (getMethod.getStatusCode() >= 300) {
+          LOGGER.error("Server: {} returned error: {}", instance, 
getMethod.getStatusCode());

Review comment:
       how about logging the message also here?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -485,4 +493,91 @@ private void deleteSegmentsInternal(String 
tableNameWithType, List<String> segme
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.FORBIDDEN);
     }
   }
+
+  @GET
+  @Path("segments/{tableName}/reload-status")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Status of segment reload", notes = "Status of segment 
reload")
+  public Map<String, TableMetadataReader.TableReloadStatus> getReloadStatus(
+          @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+          @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr) {
+    List<String> tableNamesWithType = getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));
+    Map<String, TableMetadataReader.TableReloadStatus> reloadStatusMap = new 
HashMap<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      TableMetadataReader.TableReloadStatus tableReloadStatus = null;
+      try {
+        tableReloadStatus = getSegmentsReloadStatus(tableNameWithType);
+      } catch (InvalidConfigException e) {
+        throw new ControllerApplicationException(LOGGER,
+                "Failed to load segment reload status for table: " + 
tableName, Status.NOT_FOUND);
+      }
+      if (Objects.isNull(tableReloadStatus))
+        throw new ControllerApplicationException(LOGGER,
+                "Table: " + tableName + " not found.", Status.NOT_FOUND);
+      reloadStatusMap.put(tableNameWithType, tableReloadStatus);
+    }
+    return reloadStatusMap;
+  }
+
+  private TableMetadataReader.TableReloadStatus getSegmentsReloadStatus(String 
tableNameWithType)
+          throws InvalidConfigException {
+    final Map<String, List<String>> serversToSegmentsMap =
+            
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    TableMetadataReader tableMetadataReader =
+            new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
+    return tableMetadataReader.getReloadStatus(tableNameWithType, 
serversToSegmentsMap,
+            _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+  }
+
+  @GET
+  @Path("segments/{tableName}/metadata")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get the metadata for a segment", notes = "Get the 
metadata for a segment")
+  public Map<String, String> getServerMetadata(@ApiParam(value = "Name of the 
table", required = true) @PathParam("tableName") String tableName,
+                                               @ApiParam(value = 
"OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr) {
+    TableType tableType = Constants.validateTableType(tableTypeStr);
+    if (tableType == TableType.REALTIME)
+      throw new ControllerApplicationException(LOGGER,
+              "Table type : " + tableTypeStr + " not yet supported.", 
Status.NOT_IMPLEMENTED);
+
+    List<String> tableNamesWithType = getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));
+    Map<String, String> segmentsMetadata = null;
+    try {
+      segmentsMetadata = 
getSegmentsMetadataFromServer(tableNamesWithType.get(0));
+    } catch (InvalidConfigException e) {
+      throw new ControllerApplicationException(LOGGER,
+              "Failed to load segment reload status for table: " + tableName, 
Status.NOT_FOUND);
+    } catch (IOException ioe) {
+      throw new ControllerApplicationException(LOGGER,
+              "Error parsing response to cluster config!", 
Response.Status.BAD_REQUEST, ioe);
+    }
+    if (segmentsMetadata == null)

Review comment:
       if table is not found, line 546 itself will throw exception. 

##########
File path: 
pinot-server/src/main/java/org/apache/pinot/server/api/resources/SegmentColumnIndexesFetcher.java
##########
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+public class SegmentColumnIndexesFetcher {
+  public static JsonNode getIndexesForSegmentColumns(SegmentDataManager 
segmentDataManager, Set<String> columnSet) {
+    ArrayNode columnsIndexMetadata = JsonUtils.newArrayNode();
+    if (segmentDataManager instanceof ImmutableSegmentDataManager) {
+      ImmutableSegmentDataManager immutableSegmentDataManager = 
(ImmutableSegmentDataManager) segmentDataManager;
+      ImmutableSegment immutableSegment = 
immutableSegmentDataManager.getSegment();
+      if (immutableSegment instanceof ImmutableSegmentImpl) {
+        ImmutableSegmentImpl immutableSegmentImpl = (ImmutableSegmentImpl) 
immutableSegment;
+//        Set<String> columns = 
immutableSegmentImpl.getSegmentMetadata().getAllColumns();
+        Map<String, ColumnIndexContainer> columnIndexContainerMap = 
immutableSegmentImpl.getIndexContainerMap();
+        
columnsIndexMetadata.add(getImmutableSegmentColumnIndexes(columnIndexContainerMap,
 columnSet));
+      }
+    }
+    return columnsIndexMetadata;
+  }
+
+  private static ObjectNode getImmutableSegmentColumnIndexes(Map<String, 
ColumnIndexContainer> columnIndexContainerMap,
+                                                             Set<String> 
columnSet) {
+    ObjectNode columnIndexMap = JsonUtils.newObjectNode();
+
+    for (Map.Entry<String, ColumnIndexContainer> e : 
columnIndexContainerMap.entrySet()) {
+      if (columnSet != null && !columnSet.contains(e.getKey())) {
+        continue;
+      }
+      ColumnIndexContainer columnIndexContainer = e.getValue();
+      ObjectNode indexesNode = JsonUtils.newObjectNode();

Review comment:
       nit: wrap all the if else using {}
   create private static final constants for the keys and values that are being 
put in indexesNode.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -485,4 +493,91 @@ private void deleteSegmentsInternal(String 
tableNameWithType, List<String> segme
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.FORBIDDEN);
     }
   }
+
+  @GET
+  @Path("segments/{tableName}/reload-status")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Status of segment reload", notes = "Status of segment 
reload")
+  public Map<String, TableMetadataReader.TableReloadStatus> getReloadStatus(
+          @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+          @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr) {
+    List<String> tableNamesWithType = getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));
+    Map<String, TableMetadataReader.TableReloadStatus> reloadStatusMap = new 
HashMap<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      TableMetadataReader.TableReloadStatus tableReloadStatus = null;
+      try {
+        tableReloadStatus = getSegmentsReloadStatus(tableNameWithType);
+      } catch (InvalidConfigException e) {
+        throw new ControllerApplicationException(LOGGER,
+                "Failed to load segment reload status for table: " + 
tableName, Status.NOT_FOUND);

Review comment:
       is there a better return code for this? NOT_FOUND suggests that the 
table was not found

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerSegmentMetadataReader.java
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.URI;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.pinot.common.http.MultiGetRequest;
+import org.apache.pinot.common.restlet.resources.SegmentStatus;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServerSegmentMetadataReader {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerSegmentMetadataReader.class);
+
+  private final Executor _executor;
+  private final HttpConnectionManager _connectionManager;
+
+  public ServerSegmentMetadataReader(Executor executor, HttpConnectionManager 
connectionManager) {
+    _executor = executor;
+    _connectionManager = connectionManager;
+  }
+
+  public List<String> getSegmentMetadataFromServer(String tableNameWithType,
+                                                   Map<String, List<String>> 
serversToSegmentsMap,
+                                                   BiMap<String, String> 
endpoints, int timeoutMs) {
+    LOGGER.info("Reading segment metadata from servers for table {}.", 
tableNameWithType);
+    List<String> serverURLs = new ArrayList<>();
+    for (Map.Entry<String, List<String>> serverToSegments : 
serversToSegmentsMap.entrySet()) {
+      List<String> segments = serverToSegments.getValue();
+      for (String segment : segments) {
+        serverURLs.add(generateSegmentMetadataServerURL(tableNameWithType, 
segment, endpoints.get(serverToSegments.getKey())));
+      }
+    }
+    CompletionService<GetMethod> completionService =
+            new MultiGetRequest(_executor, 
_connectionManager).execute(serverURLs, timeoutMs);
+    List<String> segmentsMetadata = new ArrayList<>();
+
+    BiMap<String, String> endpointsToServers = endpoints.inverse();
+    for (int i = 0; i < serverURLs.size(); i++) {
+      GetMethod getMethod = null;
+      try {
+        getMethod = completionService.take().get();
+        URI uri = getMethod.getURI();
+        String instance = endpointsToServers.get(uri.getHost() + ":" + 
uri.getPort());
+        if (getMethod.getStatusCode() >= 300) {
+          LOGGER.error("Server: {} returned error: {}", instance, 
getMethod.getStatusCode());
+          continue;
+        }
+        JsonNode segmentMetadata =
+                
JsonUtils.inputStreamToJsonNode(getMethod.getResponseBodyAsStream());
+        segmentsMetadata.add(JsonUtils.objectToString(segmentMetadata));
+        LOGGER.info("Updated segment metadata: {}", segmentMetadata.size());
+      } catch (Exception e) {
+        // Ignore individual exceptions because the exception has been logged 
in MultiGetRequest
+        // Log the number of failed servers after gathering all responses
+      } finally {
+        if (Objects.nonNull(getMethod)) {
+          getMethod.releaseConnection();
+        }
+      }
+    }
+    return segmentsMetadata;
+  }
+
+  private String generateSegmentMetadataServerURL(String tableNameWithType, 
String segmentName, String endpoint) {
+    return "http://"; + endpoint + "/tables/" + tableNameWithType + 
"/segments/" + segmentName + "/metadata";
+  }
+
+  private String generateReloadStatusServerURL(String tableNameWithType, 
String segmentName, String endpoint) {
+    return "http://"; + endpoint + "/tables/" + tableNameWithType + 
"/segments/" + segmentName + "/reload-status";

Review comment:
       nit: How about String.format for constructing these

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -485,4 +493,91 @@ private void deleteSegmentsInternal(String 
tableNameWithType, List<String> segme
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.FORBIDDEN);
     }
   }
+
+  @GET
+  @Path("segments/{tableName}/reload-status")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Status of segment reload", notes = "Status of segment 
reload")
+  public Map<String, TableMetadataReader.TableReloadStatus> getReloadStatus(
+          @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+          @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr) {
+    List<String> tableNamesWithType = getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));

Review comment:
       some validation to check if table was not found?
   same for the metadata endpoint

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerSegmentMetadataReader.java
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.URI;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.pinot.common.http.MultiGetRequest;
+import org.apache.pinot.common.restlet.resources.SegmentStatus;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServerSegmentMetadataReader {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerSegmentMetadataReader.class);
+
+  private final Executor _executor;
+  private final HttpConnectionManager _connectionManager;
+
+  public ServerSegmentMetadataReader(Executor executor, HttpConnectionManager 
connectionManager) {
+    _executor = executor;
+    _connectionManager = connectionManager;
+  }
+
+  public List<String> getSegmentMetadataFromServer(String tableNameWithType,
+                                                   Map<String, List<String>> 
serversToSegmentsMap,
+                                                   BiMap<String, String> 
endpoints, int timeoutMs) {
+    LOGGER.info("Reading segment metadata from servers for table {}.", 
tableNameWithType);
+    List<String> serverURLs = new ArrayList<>();
+    for (Map.Entry<String, List<String>> serverToSegments : 
serversToSegmentsMap.entrySet()) {
+      List<String> segments = serverToSegments.getValue();
+      for (String segment : segments) {
+        serverURLs.add(generateSegmentMetadataServerURL(tableNameWithType, 
segment, endpoints.get(serverToSegments.getKey())));
+      }
+    }
+    CompletionService<GetMethod> completionService =
+            new MultiGetRequest(_executor, 
_connectionManager).execute(serverURLs, timeoutMs);
+    List<String> segmentsMetadata = new ArrayList<>();
+
+    BiMap<String, String> endpointsToServers = endpoints.inverse();
+    for (int i = 0; i < serverURLs.size(); i++) {
+      GetMethod getMethod = null;
+      try {
+        getMethod = completionService.take().get();
+        URI uri = getMethod.getURI();
+        String instance = endpointsToServers.get(uri.getHost() + ":" + 
uri.getPort());
+        if (getMethod.getStatusCode() >= 300) {
+          LOGGER.error("Server: {} returned error: {}", instance, 
getMethod.getStatusCode());
+          continue;
+        }
+        JsonNode segmentMetadata =
+                
JsonUtils.inputStreamToJsonNode(getMethod.getResponseBodyAsStream());
+        segmentsMetadata.add(JsonUtils.objectToString(segmentMetadata));
+        LOGGER.info("Updated segment metadata: {}", segmentMetadata.size());
+      } catch (Exception e) {
+        // Ignore individual exceptions because the exception has been logged 
in MultiGetRequest
+        // Log the number of failed servers after gathering all responses
+      } finally {
+        if (Objects.nonNull(getMethod)) {
+          getMethod.releaseConnection();
+        }
+      }
+    }
+    return segmentsMetadata;
+  }
+
+  private String generateSegmentMetadataServerURL(String tableNameWithType, 
String segmentName, String endpoint) {
+    return "http://"; + endpoint + "/tables/" + tableNameWithType + 
"/segments/" + segmentName + "/metadata";
+  }
+
+  private String generateReloadStatusServerURL(String tableNameWithType, 
String segmentName, String endpoint) {
+    return "http://"; + endpoint + "/tables/" + tableNameWithType + 
"/segments/" + segmentName + "/reload-status";
+  }
+
+  public List<SegmentStatus> getSegmentReloadTime(String tableNameWithType,
+                                                  Map<String, List<String>> 
serversToSegmentsMap,

Review comment:
       nit: for readability
   s/serversToSegmentsMap/serverToSegments
   s/endpoints/serverToEndpoint

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -485,4 +493,91 @@ private void deleteSegmentsInternal(String 
tableNameWithType, List<String> segme
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.FORBIDDEN);
     }
   }
+
+  @GET
+  @Path("segments/{tableName}/reload-status")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Status of segment reload", notes = "Status of segment 
reload")
+  public Map<String, TableMetadataReader.TableReloadStatus> getReloadStatus(
+          @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+          @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr) {
+    List<String> tableNamesWithType = getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));
+    Map<String, TableMetadataReader.TableReloadStatus> reloadStatusMap = new 
HashMap<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      TableMetadataReader.TableReloadStatus tableReloadStatus = null;
+      try {
+        tableReloadStatus = getSegmentsReloadStatus(tableNameWithType);
+      } catch (InvalidConfigException e) {
+        throw new ControllerApplicationException(LOGGER,
+                "Failed to load segment reload status for table: " + 
tableName, Status.NOT_FOUND);
+      }
+      if (Objects.isNull(tableReloadStatus))
+        throw new ControllerApplicationException(LOGGER,
+                "Table: " + tableName + " not found.", Status.NOT_FOUND);
+      reloadStatusMap.put(tableNameWithType, tableReloadStatus);
+    }
+    return reloadStatusMap;
+  }
+
+  private TableMetadataReader.TableReloadStatus getSegmentsReloadStatus(String 
tableNameWithType)
+          throws InvalidConfigException {
+    final Map<String, List<String>> serversToSegmentsMap =
+            
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    TableMetadataReader tableMetadataReader =
+            new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
+    return tableMetadataReader.getReloadStatus(tableNameWithType, 
serversToSegmentsMap,
+            _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+  }
+
+  @GET
+  @Path("segments/{tableName}/metadata")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get the metadata for a segment", notes = "Get the 
metadata for a segment")
+  public Map<String, String> getServerMetadata(@ApiParam(value = "Name of the 
table", required = true) @PathParam("tableName") String tableName,
+                                               @ApiParam(value = 
"OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr) {
+    TableType tableType = Constants.validateTableType(tableTypeStr);
+    if (tableType == TableType.REALTIME)

Review comment:
       nit: wrap this in {}

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -485,4 +493,91 @@ private void deleteSegmentsInternal(String 
tableNameWithType, List<String> segme
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.FORBIDDEN);
     }
   }
+
+  @GET
+  @Path("segments/{tableName}/reload-status")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Status of segment reload", notes = "Status of segment 
reload")
+  public Map<String, TableMetadataReader.TableReloadStatus> getReloadStatus(
+          @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+          @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr) {
+    List<String> tableNamesWithType = getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));
+    Map<String, TableMetadataReader.TableReloadStatus> reloadStatusMap = new 
HashMap<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      TableMetadataReader.TableReloadStatus tableReloadStatus = null;
+      try {
+        tableReloadStatus = getSegmentsReloadStatus(tableNameWithType);
+      } catch (InvalidConfigException e) {
+        throw new ControllerApplicationException(LOGGER,
+                "Failed to load segment reload status for table: " + 
tableName, Status.NOT_FOUND);
+      }
+      if (Objects.isNull(tableReloadStatus))
+        throw new ControllerApplicationException(LOGGER,
+                "Table: " + tableName + " not found.", Status.NOT_FOUND);
+      reloadStatusMap.put(tableNameWithType, tableReloadStatus);
+    }
+    return reloadStatusMap;
+  }
+
+  private TableMetadataReader.TableReloadStatus getSegmentsReloadStatus(String 
tableNameWithType)

Review comment:
       In the TableReloadStatus, can we also add a message telling the caller 
how many segments failed to report?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -485,4 +493,91 @@ private void deleteSegmentsInternal(String 
tableNameWithType, List<String> segme
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.FORBIDDEN);
     }
   }
+
+  @GET
+  @Path("segments/{tableName}/reload-status")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Status of segment reload", notes = "Status of segment 
reload")
+  public Map<String, TableMetadataReader.TableReloadStatus> getReloadStatus(
+          @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+          @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr) {
+    List<String> tableNamesWithType = getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));
+    Map<String, TableMetadataReader.TableReloadStatus> reloadStatusMap = new 
HashMap<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      TableMetadataReader.TableReloadStatus tableReloadStatus = null;
+      try {
+        tableReloadStatus = getSegmentsReloadStatus(tableNameWithType);
+      } catch (InvalidConfigException e) {
+        throw new ControllerApplicationException(LOGGER,
+                "Failed to load segment reload status for table: " + 
tableName, Status.NOT_FOUND);
+      }
+      if (Objects.isNull(tableReloadStatus))

Review comment:
       can tableReloadStatus ever be null? you'll either catch exception above, 
or have a non-null reload status

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -485,4 +493,91 @@ private void deleteSegmentsInternal(String 
tableNameWithType, List<String> segme
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.FORBIDDEN);
     }
   }
+
+  @GET
+  @Path("segments/{tableName}/reload-status")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Status of segment reload", notes = "Status of segment 
reload")
+  public Map<String, TableMetadataReader.TableReloadStatus> getReloadStatus(
+          @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+          @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr) {
+    List<String> tableNamesWithType = getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));
+    Map<String, TableMetadataReader.TableReloadStatus> reloadStatusMap = new 
HashMap<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      TableMetadataReader.TableReloadStatus tableReloadStatus = null;
+      try {
+        tableReloadStatus = getSegmentsReloadStatus(tableNameWithType);
+      } catch (InvalidConfigException e) {
+        throw new ControllerApplicationException(LOGGER,
+                "Failed to load segment reload status for table: " + 
tableName, Status.NOT_FOUND);
+      }
+      if (Objects.isNull(tableReloadStatus))
+        throw new ControllerApplicationException(LOGGER,
+                "Table: " + tableName + " not found.", Status.NOT_FOUND);
+      reloadStatusMap.put(tableNameWithType, tableReloadStatus);
+    }
+    return reloadStatusMap;
+  }
+
+  private TableMetadataReader.TableReloadStatus getSegmentsReloadStatus(String 
tableNameWithType)
+          throws InvalidConfigException {
+    final Map<String, List<String>> serversToSegmentsMap =
+            
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    TableMetadataReader tableMetadataReader =
+            new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
+    return tableMetadataReader.getReloadStatus(tableNameWithType, 
serversToSegmentsMap,
+            _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+  }
+
+  @GET
+  @Path("segments/{tableName}/metadata")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get the metadata for a segment", notes = "Get the 
metadata for a segment")
+  public Map<String, String> getServerMetadata(@ApiParam(value = "Name of the 
table", required = true) @PathParam("tableName") String tableName,
+                                               @ApiParam(value = 
"OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr) {
+    TableType tableType = Constants.validateTableType(tableTypeStr);
+    if (tableType == TableType.REALTIME)
+      throw new ControllerApplicationException(LOGGER,
+              "Table type : " + tableTypeStr + " not yet supported.", 
Status.NOT_IMPLEMENTED);
+
+    List<String> tableNamesWithType = getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));

Review comment:
       you already have tableType 3 lines above

##########
File path: 
pinot-server/src/main/java/org/apache/pinot/server/api/resources/SegmentColumnIndexesFetcher.java
##########
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+public class SegmentColumnIndexesFetcher {
+  public static JsonNode getIndexesForSegmentColumns(SegmentDataManager 
segmentDataManager, Set<String> columnSet) {
+    ArrayNode columnsIndexMetadata = JsonUtils.newArrayNode();
+    if (segmentDataManager instanceof ImmutableSegmentDataManager) {
+      ImmutableSegmentDataManager immutableSegmentDataManager = 
(ImmutableSegmentDataManager) segmentDataManager;
+      ImmutableSegment immutableSegment = 
immutableSegmentDataManager.getSegment();
+      if (immutableSegment instanceof ImmutableSegmentImpl) {
+        ImmutableSegmentImpl immutableSegmentImpl = (ImmutableSegmentImpl) 
immutableSegment;
+//        Set<String> columns = 
immutableSegmentImpl.getSegmentMetadata().getAllColumns();

Review comment:
       remove comment

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.BiMap;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.restlet.resources.SegmentStatus;
+import org.apache.pinot.controller.api.resources.ServerSegmentMetadataReader;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TableMetadataReader {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableMetadataReader.class);
+
+  private final Executor executor;
+  private final HttpConnectionManager connectionManager;
+  private final PinotHelixResourceManager pinotHelixResourceManager;
+
+  public TableMetadataReader(Executor executor, HttpConnectionManager 
connectionManager,
+                             PinotHelixResourceManager helixResourceManager) {
+    this.executor = executor;
+    this.connectionManager = connectionManager;
+    this.pinotHelixResourceManager = helixResourceManager;
+  }
+
+  public TableReloadStatus getReloadStatus(String tableNameWithType, 
Map<String, List<String>> serverToSegmentsMap,

Review comment:
       can we move call to fetch serverToSegmentsMap inside this method, so 
that it's consistent with the getSegmentsMetadata Method?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -485,4 +493,91 @@ private void deleteSegmentsInternal(String 
tableNameWithType, List<String> segme
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.FORBIDDEN);
     }
   }
+
+  @GET
+  @Path("segments/{tableName}/reload-status")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Status of segment reload", notes = "Status of segment 
reload")
+  public Map<String, TableMetadataReader.TableReloadStatus> getReloadStatus(
+          @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+          @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr) {
+    List<String> tableNamesWithType = getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));
+    Map<String, TableMetadataReader.TableReloadStatus> reloadStatusMap = new 
HashMap<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      TableMetadataReader.TableReloadStatus tableReloadStatus = null;
+      try {
+        tableReloadStatus = getSegmentsReloadStatus(tableNameWithType);
+      } catch (InvalidConfigException e) {
+        throw new ControllerApplicationException(LOGGER,
+                "Failed to load segment reload status for table: " + 
tableName, Status.NOT_FOUND);
+      }
+      if (Objects.isNull(tableReloadStatus))
+        throw new ControllerApplicationException(LOGGER,
+                "Table: " + tableName + " not found.", Status.NOT_FOUND);
+      reloadStatusMap.put(tableNameWithType, tableReloadStatus);
+    }
+    return reloadStatusMap;
+  }
+
+  private TableMetadataReader.TableReloadStatus getSegmentsReloadStatus(String 
tableNameWithType)
+          throws InvalidConfigException {
+    final Map<String, List<String>> serversToSegmentsMap =
+            
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    TableMetadataReader tableMetadataReader =
+            new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
+    return tableMetadataReader.getReloadStatus(tableNameWithType, 
serversToSegmentsMap,
+            _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+  }
+
+  @GET
+  @Path("segments/{tableName}/metadata")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get the metadata for a segment", notes = "Get the 
metadata for a segment")
+  public Map<String, String> getServerMetadata(@ApiParam(value = "Name of the 
table", required = true) @PathParam("tableName") String tableName,
+                                               @ApiParam(value = 
"OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr) {
+    TableType tableType = Constants.validateTableType(tableTypeStr);
+    if (tableType == TableType.REALTIME)
+      throw new ControllerApplicationException(LOGGER,
+              "Table type : " + tableTypeStr + " not yet supported.", 
Status.NOT_IMPLEMENTED);
+
+    List<String> tableNamesWithType = getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));
+    Map<String, String> segmentsMetadata = null;
+    try {
+      segmentsMetadata = 
getSegmentsMetadataFromServer(tableNamesWithType.get(0));
+    } catch (InvalidConfigException e) {
+      throw new ControllerApplicationException(LOGGER,
+              "Failed to load segment reload status for table: " + tableName, 
Status.NOT_FOUND);
+    } catch (IOException ioe) {
+      throw new ControllerApplicationException(LOGGER,
+              "Error parsing response to cluster config!", 
Response.Status.BAD_REQUEST, ioe);
+    }
+    if (segmentsMetadata == null)
+      throw new ControllerApplicationException(LOGGER,
+              "Table: " + tableName + " not found.", Status.NOT_FOUND);
+    return segmentsMetadata;
+  }
+
+  private Map<String, String> getSegmentsMetadataFromServer(String 
tableNameWithType)
+          throws InvalidConfigException, IOException {
+    LOGGER.trace("Inside getSegmentsMetadataFromServer() entry");

Review comment:
       nit: remove trace logs?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -485,4 +493,91 @@ private void deleteSegmentsInternal(String 
tableNameWithType, List<String> segme
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.FORBIDDEN);
     }
   }
+
+  @GET
+  @Path("segments/{tableName}/reload-status")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Status of segment reload", notes = "Status of segment 
reload")
+  public Map<String, TableMetadataReader.TableReloadStatus> getReloadStatus(
+          @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+          @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr) {
+    List<String> tableNamesWithType = getExistingTableNamesWithType(tableName, 
Constants.validateTableType(tableTypeStr));
+    Map<String, TableMetadataReader.TableReloadStatus> reloadStatusMap = new 
HashMap<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      TableMetadataReader.TableReloadStatus tableReloadStatus = null;
+      try {
+        tableReloadStatus = getSegmentsReloadStatus(tableNameWithType);
+      } catch (InvalidConfigException e) {
+        throw new ControllerApplicationException(LOGGER,
+                "Failed to load segment reload status for table: " + 
tableName, Status.NOT_FOUND);
+      }
+      if (Objects.isNull(tableReloadStatus))
+        throw new ControllerApplicationException(LOGGER,
+                "Table: " + tableName + " not found.", Status.NOT_FOUND);
+      reloadStatusMap.put(tableNameWithType, tableReloadStatus);
+    }
+    return reloadStatusMap;
+  }
+
+  private TableMetadataReader.TableReloadStatus getSegmentsReloadStatus(String 
tableNameWithType)
+          throws InvalidConfigException {
+    final Map<String, List<String>> serversToSegmentsMap =
+            
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);

Review comment:
       does this call return the CONSUMING (mutable) segments and do we need to 
skip them here? 
   Also how come we're supporting realtime table in reload-status, but not in 
metadata? I thought we're only planning to skip the CONSUMING segment in both 
endpoints

##########
File path: 
pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
##########
@@ -44,7 +48,7 @@
 
   @Test
   public void getTables()
-      throws Exception {
+          throws Exception {

Review comment:
       remove all the unnecessary whitespace additions in this file

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.BiMap;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.restlet.resources.SegmentStatus;
+import org.apache.pinot.controller.api.resources.ServerSegmentMetadataReader;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TableMetadataReader {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableMetadataReader.class);
+
+  private final Executor executor;
+  private final HttpConnectionManager connectionManager;
+  private final PinotHelixResourceManager pinotHelixResourceManager;
+
+  public TableMetadataReader(Executor executor, HttpConnectionManager 
connectionManager,
+                             PinotHelixResourceManager helixResourceManager) {
+    this.executor = executor;
+    this.connectionManager = connectionManager;
+    this.pinotHelixResourceManager = helixResourceManager;
+  }
+
+  public TableReloadStatus getReloadStatus(String tableNameWithType, 
Map<String, List<String>> serverToSegmentsMap,
+                                           int timeoutMs)
+          throws InvalidConfigException {
+    BiMap<String, String> endpoints = 
pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegmentsMap.keySet());
+    ServerSegmentMetadataReader serverSegmentMetadataReader = new 
ServerSegmentMetadataReader(executor, connectionManager);
+
+    List<SegmentStatus> segmentStatus = 
serverSegmentMetadataReader.getSegmentReloadTime(tableNameWithType, 
serverToSegmentsMap, endpoints, timeoutMs);
+    TableReloadStatus tableReloadStatus = new TableReloadStatus();
+    tableReloadStatus._tableName = tableNameWithType;
+    tableReloadStatus._segmentStatus = segmentStatus;
+    return tableReloadStatus;
+  }
+
+  public Map<String, String> getSegmentsMetadata(String tableNameWithType, int 
timeoutMs) throws InvalidConfigException, IOException {
+    final Map<String, List<String>> serversToSegmentsMap =
+            
pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    BiMap<String, String> endpoints = 
pinotHelixResourceManager.getDataInstanceAdminEndpoints(serversToSegmentsMap.keySet());
+    ServerSegmentMetadataReader serverSegmentMetadataReader = new 
ServerSegmentMetadataReader(executor, connectionManager);
+
+    List<String> segmentsMetadata = 
serverSegmentMetadataReader.getSegmentMetadataFromServer(tableNameWithType,
+            serversToSegmentsMap, endpoints, timeoutMs);
+
+    Map<String, String> response = new HashMap<>();
+    for (String segmentMetadata : segmentsMetadata) {
+      JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
+      ObjectNode objectNode = responseJson.deepCopy();

Review comment:
       didnt understand why this needs to be copied to another ObjectNode. Can 
we get it directly from JsonNode?

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentStatus.java
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import java.util.Objects;
+
+public class SegmentStatus {

Review comment:
       How about moving this also to TableReader just like TableReloadStatus? 
Also, if  you need the equals and hashcode, use EqualityUtils (look at Schema  
class for example)

##########
File path: 
pinot-server/src/main/java/org/apache/pinot/server/api/resources/SegmentColumnIndexesFetcher.java
##########
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+public class SegmentColumnIndexesFetcher {
+  public static JsonNode getIndexesForSegmentColumns(SegmentDataManager 
segmentDataManager, Set<String> columnSet) {
+    ArrayNode columnsIndexMetadata = JsonUtils.newArrayNode();
+    if (segmentDataManager instanceof ImmutableSegmentDataManager) {
+      ImmutableSegmentDataManager immutableSegmentDataManager = 
(ImmutableSegmentDataManager) segmentDataManager;
+      ImmutableSegment immutableSegment = 
immutableSegmentDataManager.getSegment();
+      if (immutableSegment instanceof ImmutableSegmentImpl) {
+        ImmutableSegmentImpl immutableSegmentImpl = (ImmutableSegmentImpl) 
immutableSegment;
+//        Set<String> columns = 
immutableSegmentImpl.getSegmentMetadata().getAllColumns();
+        Map<String, ColumnIndexContainer> columnIndexContainerMap = 
immutableSegmentImpl.getIndexContainerMap();
+        
columnsIndexMetadata.add(getImmutableSegmentColumnIndexes(columnIndexContainerMap,
 columnSet));
+      }
+    }
+    return columnsIndexMetadata;
+  }
+
+  private static ObjectNode getImmutableSegmentColumnIndexes(Map<String, 
ColumnIndexContainer> columnIndexContainerMap,
+                                                             Set<String> 
columnSet) {
+    ObjectNode columnIndexMap = JsonUtils.newObjectNode();
+
+    for (Map.Entry<String, ColumnIndexContainer> e : 
columnIndexContainerMap.entrySet()) {
+      if (columnSet != null && !columnSet.contains(e.getKey())) {
+        continue;
+      }
+      ColumnIndexContainer columnIndexContainer = e.getValue();
+      ObjectNode indexesNode = JsonUtils.newObjectNode();
+      if (Objects.isNull(columnIndexContainer.getBloomFilter())) 
indexesNode.put("bloom-filter", "NO");
+      else indexesNode.put("bloom-filter", "YES");
+
+      if (Objects.isNull(columnIndexContainer.getDictionary())) 
indexesNode.put("dictionary", "NO");
+      else indexesNode.put("dictionary", "YES");
+
+      if (Objects.isNull(columnIndexContainer.getForwardIndex())) 
indexesNode.put("forward-index", "NO");
+      else indexesNode.put("forward-index", "YES");
+
+      if (Objects.isNull(columnIndexContainer.getInvertedIndex())) 
indexesNode.put("inverted-index", "NO");
+      else indexesNode.put("inverted-index", "YES");
+
+      if (Objects.isNull(columnIndexContainer.getNullValueVector()))
+        indexesNode.put("null-value-vector-reader", "NO");
+      else indexesNode.put("null-value-vector-reader", "YES");
+
+      if (Objects.isNull(columnIndexContainer.getNullValueVector())) 
indexesNode.put("range-index", "NO");
+      else indexesNode.put("range-index", "YES");
+
+      columnIndexMap.set(e.getKey(), indexesNode);
+    }
+    return columnIndexMap;

Review comment:
       any reason you're using ObjectNode instead of Map?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerSegmentMetadataReader.java
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.URI;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.pinot.common.http.MultiGetRequest;
+import org.apache.pinot.common.restlet.resources.SegmentStatus;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServerSegmentMetadataReader {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerSegmentMetadataReader.class);
+
+  private final Executor _executor;
+  private final HttpConnectionManager _connectionManager;
+
+  public ServerSegmentMetadataReader(Executor executor, HttpConnectionManager 
connectionManager) {
+    _executor = executor;
+    _connectionManager = connectionManager;
+  }
+
+  public List<String> getSegmentMetadataFromServer(String tableNameWithType,
+                                                   Map<String, List<String>> 
serversToSegmentsMap,
+                                                   BiMap<String, String> 
endpoints, int timeoutMs) {
+    LOGGER.info("Reading segment metadata from servers for table {}.", 
tableNameWithType);
+    List<String> serverURLs = new ArrayList<>();
+    for (Map.Entry<String, List<String>> serverToSegments : 
serversToSegmentsMap.entrySet()) {
+      List<String> segments = serverToSegments.getValue();
+      for (String segment : segments) {
+        serverURLs.add(generateSegmentMetadataServerURL(tableNameWithType, 
segment, endpoints.get(serverToSegments.getKey())));
+      }
+    }
+    CompletionService<GetMethod> completionService =
+            new MultiGetRequest(_executor, 
_connectionManager).execute(serverURLs, timeoutMs);
+    List<String> segmentsMetadata = new ArrayList<>();
+
+    BiMap<String, String> endpointsToServers = endpoints.inverse();
+    for (int i = 0; i < serverURLs.size(); i++) {
+      GetMethod getMethod = null;
+      try {
+        getMethod = completionService.take().get();
+        URI uri = getMethod.getURI();
+        String instance = endpointsToServers.get(uri.getHost() + ":" + 
uri.getPort());
+        if (getMethod.getStatusCode() >= 300) {
+          LOGGER.error("Server: {} returned error: {}", instance, 
getMethod.getStatusCode());
+          continue;
+        }
+        JsonNode segmentMetadata =
+                
JsonUtils.inputStreamToJsonNode(getMethod.getResponseBodyAsStream());
+        segmentsMetadata.add(JsonUtils.objectToString(segmentMetadata));
+        LOGGER.info("Updated segment metadata: {}", segmentMetadata.size());

Review comment:
       nit: this log is misleading "Updated metadata"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to