Jackie-Jiang commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1307681623


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -218,22 +226,51 @@ public synchronized void shutDown() {
   public void addRealtimeSegment(String realtimeTableName, String segmentName)
       throws Exception {
     LOGGER.info("Adding segment: {} to table: {}", segmentName, 
realtimeTableName);
-    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, realtimeTableName);
+    Pair<TableConfig, ZNRecord> tableConfigInfo =
+        ZKMetadataProvider.getTableConfigWithZNRecord(_propertyStore, 
realtimeTableName);
+    TableConfig tableConfig = tableConfigInfo.getLeft();
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", realtimeTableName);
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableConfig);
     Preconditions.checkState(schema != null, "Failed to find schema for table: 
%s", realtimeTableName);
     SegmentZKMetadata zkMetadata =
         ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
realtimeTableName, segmentName);
     Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata 
for segment: %s, table: %s", segmentName,
         realtimeTableName);
-    _tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> 
createTableDataManager(k, tableConfig))
+    _tableDataManagerMap.computeIfAbsent(realtimeTableName,
+            k -> createTableDataManager(k, tableConfig, 
tableConfigInfo.getRight()))
         .addSegment(segmentName, new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema), 
zkMetadata);
     LOGGER.info("Added segment: {} to table: {}", segmentName, 
realtimeTableName);
   }
 
-  private TableDataManager createTableDataManager(String tableNameWithType, 
TableConfig tableConfig) {
+  private void snapshotAndAddSegments(TableDataManager tableDataManager, 
String realtimeTableName)
+      throws Exception {
+    // we need to do an atomic snapshot here and then add segments because
+    // of the following race condition
+    // say we have 8 kafka partitions and we are ingesting into 8 consuming 
segments at the moment
+    // say there were 8 sealed segments before this (one for each partition)
+    // now, if we end up iterating through the 16 segments and add them one by 
one
+    // say during this progress one of them becomes committed and the state 
would be updated
+    // then we would read the stale zk state (the commit happens after we read 
this config)
+    // and add the committed segment as consuming segment
+    // to avoid such race conditions, this method does a snapshot-ed read of 
all the segments
+    // for this table in one shot
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, realtimeTableName);

Review Comment:
   Why do we take another table config here? We should pass the one from the 
caller to ensure the version is correctly compared and set



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -310,6 +311,13 @@ public static TableConfig 
getTableConfig(ZkHelixPropertyStore<ZNRecord> property
         AccessOption.PERSISTENT));
   }
 
+  public static Pair<TableConfig, ZNRecord> 
getTableConfigWithZNRecord(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String tableNameWithType) {
+    ZNRecord znRecord = 
propertyStore.get(constructPropertyStorePathForResourceConfig(tableNameWithType),
 null,

Review Comment:
   Can you double check if this will set the node version into the `ZNRecord`? 
I think you'll need to pass in a `Stat` to gather that info



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java:
##########
@@ -86,6 +89,31 @@ public void reset() {
     LOGGER.info("Reset called");
   }
 
+  private class ReloadTableMessageHandler extends MessageHandler {
+    final String _tableNameWithType;
+    final boolean _force;
+
+    ReloadTableMessageHandler(TableReloadMessage tableReloadMessage, 
NotificationContext context) {
+      super(tableReloadMessage, context);
+      _tableNameWithType = tableReloadMessage.getTableNameWithType();
+      _force = tableReloadMessage.isForce();
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      _instanceDataManager.reloadTable(_tableNameWithType, _force, 
_segmentRefreshSemaphore);
+      HelixTaskResult result = new HelixTaskResult();
+      result.setSuccess(true);
+      return result;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
+      LOGGER.error("Got error while refreshing table config for table: {} 
(error code: {}, error type: {})",

Review Comment:
   (minor) Revise the error message



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -242,6 +242,30 @@ public void addRealtimeSegment(String realtimeTableName, 
String segmentName)
     LOGGER.info("Added segment: {} to table: {}", segmentName, 
realtimeTableName);
   }
 
+  private void snapshotAndAddSegments(TableDataManager tableDataManager, 
String realtimeTableName)
+      throws Exception {
+    // we need to do an atomic snapshot here and then add segments because

Review Comment:
   I did some research, and seems the behavior of `get()` during `compute()` on 
the same key is java version dependent (might block), but typically 
non-blocking.
   Assuming it is non-blocking, then we need to ensure making all other write 
operations (e.g. add segment, remove segment, reload segment etc.) blocking so 
that they are not applied to the old data manager.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -104,6 +109,72 @@ public Response pauseConsumption(
     }
   }
 
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/reload")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.RELOAD_TABLE)
+  @ApiOperation(value = "Reloads the table across all the servers", notes = 
"This would reconstruct the table data"
+      + "manager in case of configuration changes. Example usage: trigger 
after converting the upsert mode"
+      + "from full to partial.")
+  public SuccessResponse reloadTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName")
+      String tableName,
+      @ApiParam(value = "Whether to force the reload (if table config is not 
updated, reload will not be done)")
+      @QueryParam("force") boolean force,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
+      @Context HttpHeaders httpHeaders, @Context Request request) {
+    List<String> tableNamesWithType = 
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
+        tableName, Constants.validateTableType(tableTypeStr), LOGGER);
+    if (tableNamesWithType.isEmpty()) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find table: 
" + tableName,
+          Response.Status.NOT_FOUND);
+    }
+    // TODO: check if another ongoing reload job exists and fail the request
+    // this cannot be done right now because reload controller job doesn't 
have a status field
+    StringBuilder response = new StringBuilder();
+    for (String tableNameWithType: tableNamesWithType) {
+      Pair<Integer, String> msgInfo = 
_pinotHelixResourceManager.reloadTable(tableNameWithType, force);
+      boolean zkJobMetaWriteSuccess = false;
+      if (msgInfo.getLeft() > 0) {
+        try {
+          if 
(_pinotHelixResourceManager.addNewReloadTableJob(tableNameWithType, 
msgInfo.getRight(),
+              msgInfo.getLeft())) {
+            zkJobMetaWriteSuccess = true;
+          } else {
+            LOGGER.error("Failed to add reload table job meta into zookeeper 
for table: {}", tableNameWithType);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Failed to add reload segment job meta into zookeeper 
for table: {}", tableNameWithType, e);
+        }
+        response.append(String.format("Submitted reload table job for table: 
%s, with id: %s, sent %d reload messages."
+                + " Job meta ZK storage status: %s", tableNameWithType, 
msgInfo.getRight(), msgInfo.getLeft(),
+            zkJobMetaWriteSuccess ? "SUCCESS" : "FAILED")).append(". ");
+      } else {
+        throw new ControllerApplicationException(LOGGER, "Failed to find 
table: " + tableNameWithType,
+            Response.Status.NOT_FOUND);
+      }
+    }
+    return new SuccessResponse(response.toString().trim());
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/tableReloadStatus/{jobId}")

Review Comment:
   Need `@Authorize` here as well



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -218,22 +226,51 @@ public synchronized void shutDown() {
   public void addRealtimeSegment(String realtimeTableName, String segmentName)
       throws Exception {
     LOGGER.info("Adding segment: {} to table: {}", segmentName, 
realtimeTableName);
-    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, realtimeTableName);
+    Pair<TableConfig, ZNRecord> tableConfigInfo =
+        ZKMetadataProvider.getTableConfigWithZNRecord(_propertyStore, 
realtimeTableName);
+    TableConfig tableConfig = tableConfigInfo.getLeft();
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", realtimeTableName);
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableConfig);
     Preconditions.checkState(schema != null, "Failed to find schema for table: 
%s", realtimeTableName);
     SegmentZKMetadata zkMetadata =
         ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
realtimeTableName, segmentName);
     Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata 
for segment: %s, table: %s", segmentName,
         realtimeTableName);
-    _tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> 
createTableDataManager(k, tableConfig))
+    _tableDataManagerMap.computeIfAbsent(realtimeTableName,
+            k -> createTableDataManager(k, tableConfig, 
tableConfigInfo.getRight()))
         .addSegment(segmentName, new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema), 
zkMetadata);
     LOGGER.info("Added segment: {} to table: {}", segmentName, 
realtimeTableName);
   }
 
-  private TableDataManager createTableDataManager(String tableNameWithType, 
TableConfig tableConfig) {
+  private void snapshotAndAddSegments(TableDataManager tableDataManager, 
String realtimeTableName)
+      throws Exception {
+    // we need to do an atomic snapshot here and then add segments because
+    // of the following race condition
+    // say we have 8 kafka partitions and we are ingesting into 8 consuming 
segments at the moment
+    // say there were 8 sealed segments before this (one for each partition)
+    // now, if we end up iterating through the 16 segments and add them one by 
one
+    // say during this progress one of them becomes committed and the state 
would be updated
+    // then we would read the stale zk state (the commit happens after we read 
this config)
+    // and add the committed segment as consuming segment
+    // to avoid such race conditions, this method does a snapshot-ed read of 
all the segments
+    // for this table in one shot
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, realtimeTableName);
+    Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", realtimeTableName);
+    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableConfig);
+    List<SegmentZKMetadata> segments = 
ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, realtimeTableName);
+    for (SegmentZKMetadata segment : segments) {
+      String segmentName = segment.getSegmentName();
+      tableDataManager.addSegment(segmentName, new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema),

Review Comment:
   This API is used for RT table only. It will throw exception for OFFLINE table



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to