AlexanderKM commented on code in PR #16867:
URL: https://github.com/apache/pinot/pull/16867#discussion_r2377002072
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2552,51 +2554,117 @@ public List<Tier> getSortedTiers(TableConfig
tableConfig) {
TierFactory.PINOT_SERVER_STORAGE_TYPE);
}
- public void assignSegment(TableConfig tableConfig, SegmentZKMetadata
segmentZKMetadata) {
+ public void assignSegmentWithRetry(TableConfig tableConfig,
SegmentZKMetadata segmentZKMetadata) {
String tableNameWithType = tableConfig.getTableName();
String segmentName = segmentZKMetadata.getSegmentName();
- // Assign instances for the segment and add it into IdealState
try {
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
- // TODO: Support direct tier assignment for UPLOADED real-time segments
- if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
- instancePartitionsMap = getInstacePartitionsMap(tableConfig,
segmentZKMetadata.getTier());
+ assignSegmentInternal(tableConfig, segmentZKMetadata);
+ } catch (Exception e) {
+ if (containsException(e, ZkInterruptedException.class)) {
+ LOGGER.warn("Encountered ZkInterruptedException while assigning
segment: {} to table: {}. Retrying once...",
+ segmentName, tableNameWithType);
+
+ try {
+ assignSegmentInternal(tableConfig, segmentZKMetadata);
+ LOGGER.info("Successfully assigned segment: {} to table: {} on
retry", segmentName, tableNameWithType);
+ return;
+ } catch (Exception retryException) {
+ LOGGER.error("Retry failed for assigning segment: {} to table {}.
Proceeding with cleanup.",
+ segmentName, tableNameWithType, retryException);
+ }
+ }
+
+ handleAssignmentFailure(tableNameWithType, segmentName, e);
+ }
+ }
+
+ private void handleAssignmentFailure(String tableNameWithType, String
segmentName, Exception originalException) {
+ LOGGER.error(
+ "Caught exception while adding segment: {} to IdealState for table:
{}, deleting segment ZK metadata",
+ segmentName, tableNameWithType, originalException);
+ if (removeSegmentZKMetadata(tableNameWithType, segmentName)) {
+ LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}",
segmentName, tableNameWithType);
+ } else {
+ LOGGER.error("Failed to delete segment ZK metadata for segment: {} of
table: {}", segmentName,
+ tableNameWithType);
+ }
+
+ if (containsException(originalException, ZkInterruptedException.class)) {
+ LOGGER.warn("Encountered ZkInterruptedException while assigning segment:
{} to table: {}. "
+ + "Deleting segment to prevent inconsistent state.",
+ segmentName, tableNameWithType);
+
+ PinotResourceManagerResponse response = deleteSegment(tableNameWithType,
segmentName);
+ String errorMessage;
+ if (!response.isSuccessful()) {
+ errorMessage =
+ String.format("Failed to delete segment: %s of table: %s after
ZkInterruptedException. Response: %s",
+ segmentName, tableNameWithType, response.getMessage());
} else {
- instancePartitionsMap =
fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
+ errorMessage = String.format(
+ "Failed to assign segment: %s to table: %s due to
ZkInterruptedException. "
+ + "Segment deleted successfully.",
+ segmentName, tableNameWithType);
}
+ LOGGER.error(errorMessage);
+ throw new SegmentIngestionFailureException(errorMessage);
+ }
- SegmentAssignment segmentAssignment =
- SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager,
tableConfig, _controllerMetrics);
- HelixHelper.updateIdealState(_helixZkManager, tableNameWithType,
idealState -> {
- assert idealState != null;
- Map<String, Map<String, String>> currentAssignment =
idealState.getRecord().getMapFields();
- if (currentAssignment.containsKey(segmentName)) {
- LOGGER.warn("Segment: {} already exists in the IdealState for table:
{}, do not update", segmentName,
- tableNameWithType);
- } else {
- List<String> assignedInstances =
- segmentAssignment.assignSegment(segmentName, currentAssignment,
instancePartitionsMap);
- LOGGER.info("Assigning segment: {} to instances: {} for table: {}",
segmentName, assignedInstances,
- tableNameWithType);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
SegmentStateModel.ONLINE));
- }
- return idealState;
- });
- LOGGER.info("Added segment: {} to IdealState for table: {}",
segmentName, tableNameWithType);
- } catch (Exception e) {
- LOGGER.error(
- "Caught exception while adding segment: {} to IdealState for table:
{}, deleting segment ZK metadata",
- segmentName, tableNameWithType, e);
- if (removeSegmentZKMetadata(tableNameWithType, segmentName)) {
- LOGGER.info("Deleted segment ZK metadata for segment: {} of table:
{}", segmentName, tableNameWithType);
+ throw new RuntimeException(originalException);
+ }
+
+ private void assignSegmentInternal(TableConfig tableConfig,
SegmentZKMetadata segmentZKMetadata) {
+ String tableNameWithType = tableConfig.getTableName();
+ String segmentName = segmentZKMetadata.getSegmentName();
+
+ // Assign instances for the segment and add it into IdealState
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
+ // TODO: Support direct tier assignment for UPLOADED real-time segments
+ if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+ instancePartitionsMap = getInstacePartitionsMap(tableConfig,
segmentZKMetadata.getTier());
+ } else {
+ instancePartitionsMap =
fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
+ }
+
+ SegmentAssignment segmentAssignment =
+ SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager,
tableConfig, _controllerMetrics);
+ HelixHelper.updateIdealState(_helixZkManager, tableNameWithType,
idealState -> {
+ assert idealState != null;
+ Map<String, Map<String, String>> currentAssignment =
idealState.getRecord().getMapFields();
+ if (currentAssignment.containsKey(segmentName)) {
Review Comment:
Also note: by retrying, we are also checking if the segment exists
automatically here which is nice
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]