Jackie-Jiang commented on code in PR #11183: URL: https://github.com/apache/pinot/pull/11183#discussion_r1281223068
########## pinot-common/src/main/java/org/apache/pinot/common/messages/TableReloadMessage.java: ########## @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.common.messages; + +import java.util.UUID; +import org.apache.helix.model.Message; +import org.apache.helix.zookeeper.datamodel.ZNRecord; + +/** + * This (Helix) message is sent from the controller to brokers when a request is received to reload the table. + * + * NOTE: We keep the table name as a separate key instead of using the Helix PARTITION_NAME so that this message can be + * used for any resource. + */ +public class TableReloadMessage extends Message { + public static final String RELOAD_TABLE_MSG_SUB_TYPE = "RELOAD_TABLE"; Review Comment: (format) Please reformat all the changes with [Pinot Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#set-up-ide) ########## pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java: ########## @@ -19,5 +19,5 @@ package org.apache.pinot.common.metadata.controllerjob; public enum ControllerJobType { - RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE + RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, RELOAD_TABLE Review Comment: (minor) Move it following `RELOAD_SEGMENT` ########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java: ########## @@ -242,6 +242,30 @@ public void addRealtimeSegment(String realtimeTableName, String segmentName) LOGGER.info("Added segment: {} to table: {}", segmentName, realtimeTableName); } + private void snapshotAndAddSegments(TableDataManager tableDataManager, String realtimeTableName) + throws Exception { + // we need to do an atomic snapshot here and then add segments because Review Comment: This one has race condition because when we create the new table data manager, there can be other operations applied to the old table data manager (add/remove/replace segment) which are not reflected in the new table data manager. We need to lock up the old table data manager until it is fully replaced ########## pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java: ########## @@ -420,10 +420,14 @@ private void setUpPinotController() { new PinotHelixTaskResourceManager(_helixResourceManager, new TaskDriver(_helixParticipantManager), _config.getPinotTaskExpireTimeInMs()); + _connectionManager = new MultiThreadedHttpConnectionManager(); + _connectionManager.getParams().setConnectionTimeout(_config.getServerAdminRequestTimeoutSeconds() * 1000); + // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager LOGGER.info("Starting realtime segment manager"); _pinotLLCRealtimeSegmentManager = - new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics); + new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics, _executorService, Review Comment: Revert the changes in this file ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -170,11 +172,13 @@ public class PinotLLCRealtimeSegmentManager { private final int _deepstoreUploadRetryTimeoutMs; private final FileUploadDownloadClient _fileUploadDownloadClient; private final AtomicInteger _numCompletingSegments = new AtomicInteger(0); + private final Executor _executor; + private final HttpConnectionManager _connectionManager; private volatile boolean _isStopping = false; public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf, - ControllerMetrics controllerMetrics) { + ControllerMetrics controllerMetrics, Executor executor, HttpConnectionManager connectionManager) { Review Comment: These 2 fields are not used. Please revert the related changes ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java: ########## @@ -104,6 +110,70 @@ public Response pauseConsumption( } } + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tables/{tableName}/reload") + @ApiOperation(value = "Reloads the table across all the servers", notes = "This would reconstruct the table data" + + "manager in case of configuration changes. Example usage: trigger after converting the upsert mode" + + "from full to partial.") + public SuccessResponse reloadTable( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") @Nullable + String tableName, + @ApiParam(value = "Whether to force the reload (if table config is not updated, reload will not be done)") + @QueryParam("force") boolean force, + @Context HttpHeaders httpHeaders, @Context Request request) { + String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); Review Comment: This should apply to both offline and real-time table ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java: ########## @@ -104,6 +110,70 @@ public Response pauseConsumption( } } + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tables/{tableName}/reload") + @ApiOperation(value = "Reloads the table across all the servers", notes = "This would reconstruct the table data" + + "manager in case of configuration changes. Example usage: trigger after converting the upsert mode" + + "from full to partial.") + public SuccessResponse reloadTable( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") @Nullable + String tableName, + @ApiParam(value = "Whether to force the reload (if table config is not updated, reload will not be done)") + @QueryParam("force") boolean force, + @Context HttpHeaders httpHeaders, @Context Request request) { + String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); + validate(tableNameWithType); + // checking if another reload job exists + // if so, fail the request with the ongoing reload job's id + Map<String, Map<String, String>> reloadJobs = + _pinotHelixResourceManager.getAllJobsForTable(tableNameWithType, + Collections.singleton(ControllerJobType.RELOAD_TABLE)); + if (reloadJobs.size() > 0) { + String jobId = reloadJobs.keySet().iterator().next(); Review Comment: This doesn't seem correct. We should still all table reload even if the table was reloaded before. We track both the current jobs and the old finished jobs ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java: ########## @@ -20,15 +20,25 @@ import java.util.Map; + public class ServerReloadControllerJobStatusResponse { private double _timeElapsedInMinutes; private double _estimatedTimeRemainingInMinutes; private int _totalSegmentCount; + private int _totalCount; Review Comment: What does total count stands for? Seems you use it for both segment count and server count, but we already have stats for both of them ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java: ########## @@ -104,6 +110,70 @@ public Response pauseConsumption( } } + @POST + @Produces(MediaType.APPLICATION_JSON) Review Comment: Add `@Authorize` annotation to these APIs ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java: ########## @@ -438,4 +439,49 @@ public int getReplication() { } return replication; } + + @Override + public boolean equals(Object o) { Review Comment: No need to override them ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java: ########## @@ -696,44 +696,9 @@ public ServerReloadControllerJobStatusResponse getReloadJobStatus( totalSegments += entry.getValue().size(); } serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments); - serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size()); - serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount); - - for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) { - String responseString = streamResponse.getValue(); - try { - ServerReloadControllerJobStatusResponse response = - JsonUtils.stringToObject(responseString, ServerReloadControllerJobStatusResponse.class); - serverReloadControllerJobStatusResponse.setSuccessCount( - serverReloadControllerJobStatusResponse.getSuccessCount() + response.getSuccessCount()); - } catch (Exception e) { - serverReloadControllerJobStatusResponse.setTotalServerCallsFailed( - serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1 - ); - } - } - - // Add ZK fields - serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata); - - // Add derived fields - long submissionTime = - Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)); - double timeElapsedInMinutes = ((double) System.currentTimeMillis() - (double) submissionTime) / (1000.0 * 60.0); - int remainingSegments = serverReloadControllerJobStatusResponse.getTotalSegmentCount() - - serverReloadControllerJobStatusResponse.getSuccessCount(); - - double estimatedRemainingTimeInMinutes = -1; - if (serverReloadControllerJobStatusResponse.getSuccessCount() > 0) { - estimatedRemainingTimeInMinutes = - ((double) remainingSegments / (double) serverReloadControllerJobStatusResponse.getSuccessCount()) - * timeElapsedInMinutes; - } - - serverReloadControllerJobStatusResponse.setTimeElapsedInMinutes(timeElapsedInMinutes); - serverReloadControllerJobStatusResponse.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes); - - return serverReloadControllerJobStatusResponse; + serverReloadControllerJobStatusResponse.setTotalCount(totalSegments); + return ResourceUtils.buildServerReloadControllerJobStatusResponse(controllerJobZKMetadata, Review Comment: Can this logic be shared? For table reload, there is a single target, and I don't see the point of tracking the percentage ########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java: ########## @@ -509,6 +544,44 @@ public void addOrReplaceSegment(String tableNameWithType, String segmentName) } } + @Override + public void reloadTable(String tableNameWithType, boolean force, SegmentRefreshSemaphore segmentRefreshSemaphore) { + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); + AtomicReference<TableDataManager> toBeShutdown = new AtomicReference<>(); + _tableDataManagerMap.computeIfPresent(tableNameWithType, (k, tdm) -> { + // create only if a table data manager doesn't exist + // or the current configuration and the new configuration are different + if (!force && tdm.getTableDataManagerConfig().getTableConfig().equals(tableConfig)) { Review Comment: We can compare the ZNode version of the table config to identify if it is changed ########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java: ########## @@ -509,6 +544,44 @@ public void addOrReplaceSegment(String tableNameWithType, String segmentName) } } + @Override + public void reloadTable(String tableNameWithType, boolean force, SegmentRefreshSemaphore segmentRefreshSemaphore) { + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); + AtomicReference<TableDataManager> toBeShutdown = new AtomicReference<>(); + _tableDataManagerMap.computeIfPresent(tableNameWithType, (k, tdm) -> { + // create only if a table data manager doesn't exist + // or the current configuration and the new configuration are different + if (!force && tdm.getTableDataManagerConfig().getTableConfig().equals(tableConfig)) { + LOGGER.info("Not reloading table as the table data manager config is not changed"); + return tdm; + } + // this will load all the segments and start a new consuming segment + // for the time being we will be consuming 2X the memory here + // once all the segments are loaded we swap the new table data manager with the old one + LOGGER.info("Recreating table data manager for table: {} as we received a reload table request", + tableNameWithType); + TableDataManager newTdm = createTableDataManager(k, tableConfig); + List<SegmentDataManager> segments = tdm.acquireAllSegments(); + try { + snapshotAndAddSegments(newTdm, tableNameWithType); + } catch (Exception e) { + throw new RuntimeException(e); + } + for (SegmentDataManager sdm : segments) { + tdm.releaseSegment(sdm); + } + LOGGER.info("All segments loaded for table: {}", tableNameWithType); + toBeShutdown.set(tdm); + return newTdm; + }); + // shutting the old table data manager + TableDataManager oldTdm = toBeShutdown.get(); Review Comment: We cannot directly shut down the data manager: 1. Segments are not released yet 2. There might be queries still on the old data manager -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org