sajjad-moradi commented on code in PR #8986: URL: https://github.com/apache/pinot/pull/8986#discussion_r918204110
########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java: ########## @@ -169,6 +173,34 @@ public HelixTaskResult handleMessage() } } + private class ForceCommitMessageHandler extends DefaultMessageHandler { + + private String _tableName; + private Set<String> _segmentNames; + + public ForceCommitMessageHandler(ForceCommitMessage forceCommitMessage, ServerMetrics metrics, + NotificationContext ctx) { + super(forceCommitMessage, metrics, ctx); + _tableName = forceCommitMessage.getTableName(); + _segmentNames = forceCommitMessage.getSegmentNames(); + } + + @Override + public HelixTaskResult handleMessage() + throws InterruptedException { + HelixTaskResult helixTaskResult = new HelixTaskResult(); + _logger.info("Handling force commit message for table {} segments {}", _tableName, _segmentNames); + try { + _instanceDataManager.forceCommit(_tableName, _segmentNames); + helixTaskResult.setSuccess(true); + } catch (Exception e) { + _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETE_TABLE_FAILURES, 1); + Utils.rethrowException(e); Review Comment: Helix doesn't retry. It just logs the exception. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1399,4 +1417,120 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe } } } + + /** + * Pause consumption on a table by + * 1) setting "isTablePaused" in ideal states to true and + * 2) sending force commit messages to servers + */ + public PauseStatus pauseConsumption(String tableNameWithType) { + IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, true); + Set<String> consumingSegments = findConsumingSegments(updatedIdealState); + sendForceCommitMessageToServers(tableNameWithType, consumingSegments); + return new PauseStatus(true, consumingSegments, consumingSegments.isEmpty() ? null : "Pause flag is set." + + " Consuming segments are being committed." + + " Use /pauseStatus endpoint in a few moments to check if all consuming segments have been committed."); + } + + /** + * Resume consumption on a table by + * 1) setting "isTablePaused" in ideal states to false and + * 2) triggering segment validation job to create new consuming segments in ideal states + */ + public PauseStatus resumeConsumption(String tableNameWithType) { + IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, false); + + // trigger realtime segment validation job to resume consumption + Map<String, String> taskProperties = new HashMap<>(); + taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true"); Review Comment: Created the issue: #9039 ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1399,4 +1417,120 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe } } } + + /** + * Pause consumption on a table by + * 1) setting "isTablePaused" in ideal states to true and + * 2) sending force commit messages to servers + */ + public PauseStatus pauseConsumption(String tableNameWithType) { + IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, true); + Set<String> consumingSegments = findConsumingSegments(updatedIdealState); + sendForceCommitMessageToServers(tableNameWithType, consumingSegments); + return new PauseStatus(true, consumingSegments, consumingSegments.isEmpty() ? null : "Pause flag is set." Review Comment: This is not internally used. We return the pause status to the client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org