Jackie-Jiang commented on code in PR #11183: URL: https://github.com/apache/pinot/pull/11183#discussion_r1307681623
########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java: ########## @@ -218,22 +226,51 @@ public synchronized void shutDown() { public void addRealtimeSegment(String realtimeTableName, String segmentName) throws Exception { LOGGER.info("Adding segment: {} to table: {}", segmentName, realtimeTableName); - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, realtimeTableName); + Pair<TableConfig, ZNRecord> tableConfigInfo = + ZKMetadataProvider.getTableConfigWithZNRecord(_propertyStore, realtimeTableName); + TableConfig tableConfig = tableConfigInfo.getLeft(); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", realtimeTableName); Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig); Preconditions.checkState(schema != null, "Failed to find schema for table: %s", realtimeTableName); SegmentZKMetadata zkMetadata = ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, realtimeTableName, segmentName); Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s", segmentName, realtimeTableName); - _tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> createTableDataManager(k, tableConfig)) + _tableDataManagerMap.computeIfAbsent(realtimeTableName, + k -> createTableDataManager(k, tableConfig, tableConfigInfo.getRight())) .addSegment(segmentName, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema), zkMetadata); LOGGER.info("Added segment: {} to table: {}", segmentName, realtimeTableName); } - private TableDataManager createTableDataManager(String tableNameWithType, TableConfig tableConfig) { + private void snapshotAndAddSegments(TableDataManager tableDataManager, String realtimeTableName) + throws Exception { + // we need to do an atomic snapshot here and then add segments because + // of the following race condition + // say we have 8 kafka partitions and we are ingesting into 8 consuming segments at the moment + // say there were 8 sealed segments before this (one for each partition) + // now, if we end up iterating through the 16 segments and add them one by one + // say during this progress one of them becomes committed and the state would be updated + // then we would read the stale zk state (the commit happens after we read this config) + // and add the committed segment as consuming segment + // to avoid such race conditions, this method does a snapshot-ed read of all the segments + // for this table in one shot + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, realtimeTableName); Review Comment: Why do we take another table config here? We should pass the one from the caller to ensure the version is correctly compared and set ########## pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java: ########## @@ -310,6 +311,13 @@ public static TableConfig getTableConfig(ZkHelixPropertyStore<ZNRecord> property AccessOption.PERSISTENT)); } + public static Pair<TableConfig, ZNRecord> getTableConfigWithZNRecord(ZkHelixPropertyStore<ZNRecord> propertyStore, + String tableNameWithType) { + ZNRecord znRecord = propertyStore.get(constructPropertyStorePathForResourceConfig(tableNameWithType), null, Review Comment: Can you double check if this will set the node version into the `ZNRecord`? I think you'll need to pass in a `Stat` to gather that info ########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java: ########## @@ -86,6 +89,31 @@ public void reset() { LOGGER.info("Reset called"); } + private class ReloadTableMessageHandler extends MessageHandler { + final String _tableNameWithType; + final boolean _force; + + ReloadTableMessageHandler(TableReloadMessage tableReloadMessage, NotificationContext context) { + super(tableReloadMessage, context); + _tableNameWithType = tableReloadMessage.getTableNameWithType(); + _force = tableReloadMessage.isForce(); + } + + @Override + public HelixTaskResult handleMessage() { + _instanceDataManager.reloadTable(_tableNameWithType, _force, _segmentRefreshSemaphore); + HelixTaskResult result = new HelixTaskResult(); + result.setSuccess(true); + return result; + } + + @Override + public void onError(Exception e, ErrorCode code, ErrorType type) { + LOGGER.error("Got error while refreshing table config for table: {} (error code: {}, error type: {})", Review Comment: (minor) Revise the error message ########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java: ########## @@ -242,6 +242,30 @@ public void addRealtimeSegment(String realtimeTableName, String segmentName) LOGGER.info("Added segment: {} to table: {}", segmentName, realtimeTableName); } + private void snapshotAndAddSegments(TableDataManager tableDataManager, String realtimeTableName) + throws Exception { + // we need to do an atomic snapshot here and then add segments because Review Comment: I did some research, and seems the behavior of `get()` during `compute()` on the same key is java version dependent (might block), but typically non-blocking. Assuming it is non-blocking, then we need to ensure making all other write operations (e.g. add segment, remove segment, reload segment etc.) blocking so that they are not applied to the old data manager. ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java: ########## @@ -104,6 +109,72 @@ public Response pauseConsumption( } } + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tables/{tableName}/reload") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.RELOAD_TABLE) + @ApiOperation(value = "Reloads the table across all the servers", notes = "This would reconstruct the table data" + + "manager in case of configuration changes. Example usage: trigger after converting the upsert mode" + + "from full to partial.") + public SuccessResponse reloadTable( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") + String tableName, + @ApiParam(value = "Whether to force the reload (if table config is not updated, reload will not be done)") + @QueryParam("force") boolean force, + @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, + @Context HttpHeaders httpHeaders, @Context Request request) { + List<String> tableNamesWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, + tableName, Constants.validateTableType(tableTypeStr), LOGGER); + if (tableNamesWithType.isEmpty()) { + throw new ControllerApplicationException(LOGGER, "Failed to find table: " + tableName, + Response.Status.NOT_FOUND); + } + // TODO: check if another ongoing reload job exists and fail the request + // this cannot be done right now because reload controller job doesn't have a status field + StringBuilder response = new StringBuilder(); + for (String tableNameWithType: tableNamesWithType) { + Pair<Integer, String> msgInfo = _pinotHelixResourceManager.reloadTable(tableNameWithType, force); + boolean zkJobMetaWriteSuccess = false; + if (msgInfo.getLeft() > 0) { + try { + if (_pinotHelixResourceManager.addNewReloadTableJob(tableNameWithType, msgInfo.getRight(), + msgInfo.getLeft())) { + zkJobMetaWriteSuccess = true; + } else { + LOGGER.error("Failed to add reload table job meta into zookeeper for table: {}", tableNameWithType); + } + } catch (Exception e) { + LOGGER.error("Failed to add reload segment job meta into zookeeper for table: {}", tableNameWithType, e); + } + response.append(String.format("Submitted reload table job for table: %s, with id: %s, sent %d reload messages." + + " Job meta ZK storage status: %s", tableNameWithType, msgInfo.getRight(), msgInfo.getLeft(), + zkJobMetaWriteSuccess ? "SUCCESS" : "FAILED")).append(". "); + } else { + throw new ControllerApplicationException(LOGGER, "Failed to find table: " + tableNameWithType, + Response.Status.NOT_FOUND); + } + } + return new SuccessResponse(response.toString().trim()); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tables/tableReloadStatus/{jobId}") Review Comment: Need `@Authorize` here as well ########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java: ########## @@ -218,22 +226,51 @@ public synchronized void shutDown() { public void addRealtimeSegment(String realtimeTableName, String segmentName) throws Exception { LOGGER.info("Adding segment: {} to table: {}", segmentName, realtimeTableName); - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, realtimeTableName); + Pair<TableConfig, ZNRecord> tableConfigInfo = + ZKMetadataProvider.getTableConfigWithZNRecord(_propertyStore, realtimeTableName); + TableConfig tableConfig = tableConfigInfo.getLeft(); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", realtimeTableName); Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig); Preconditions.checkState(schema != null, "Failed to find schema for table: %s", realtimeTableName); SegmentZKMetadata zkMetadata = ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, realtimeTableName, segmentName); Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s", segmentName, realtimeTableName); - _tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> createTableDataManager(k, tableConfig)) + _tableDataManagerMap.computeIfAbsent(realtimeTableName, + k -> createTableDataManager(k, tableConfig, tableConfigInfo.getRight())) .addSegment(segmentName, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema), zkMetadata); LOGGER.info("Added segment: {} to table: {}", segmentName, realtimeTableName); } - private TableDataManager createTableDataManager(String tableNameWithType, TableConfig tableConfig) { + private void snapshotAndAddSegments(TableDataManager tableDataManager, String realtimeTableName) + throws Exception { + // we need to do an atomic snapshot here and then add segments because + // of the following race condition + // say we have 8 kafka partitions and we are ingesting into 8 consuming segments at the moment + // say there were 8 sealed segments before this (one for each partition) + // now, if we end up iterating through the 16 segments and add them one by one + // say during this progress one of them becomes committed and the state would be updated + // then we would read the stale zk state (the commit happens after we read this config) + // and add the committed segment as consuming segment + // to avoid such race conditions, this method does a snapshot-ed read of all the segments + // for this table in one shot + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, realtimeTableName); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", realtimeTableName); + Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig); + List<SegmentZKMetadata> segments = ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, realtimeTableName); + for (SegmentZKMetadata segment : segments) { + String segmentName = segment.getSegmentName(); + tableDataManager.addSegment(segmentName, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema), Review Comment: This API is used for RT table only. It will throw exception for OFFLINE table -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org