This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a209f9b0b1 Adding a new list in the propertystore to capture the committing segments. (#15016) a209f9b0b1 is described below commit a209f9b0b16c38e2c1559944b86af6a56ba3a70a Author: 9aman <35227405+9a...@users.noreply.github.com> AuthorDate: Wed Feb 12 15:21:32 2025 +0530 Adding a new list in the propertystore to capture the committing segments. (#15016) * Adding a new list in the propertystore to capture the committing segments. This will make it quicker to fetch the committing segments without going through each segment ZK metadata * Minor improvements and fixing checkstyle violations * Adding debufInfo API to fetch committing segments * Add support for adding missing committing segments due to failure in adding them during commit start protocol * Running addition and deletion of segments from the list in the same ZK call. This aims to simplify the code * Improving logs for debuggability --- .../pinot/common/metadata/ZKMetadataProvider.java | 5 + .../api/resources/PinotRealtimeTableResource.java | 59 ++++++ .../realtime/PinotLLCRealtimeSegmentManager.java | 209 ++++++++++++++++++++- .../RealtimeSegmentValidationManager.java | 23 +++ .../PinotLLCRealtimeSegmentManagerTest.java | 171 +++++++++++++++++ 5 files changed, 460 insertions(+), 7 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 0729533871..37ba1499e3 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -66,6 +66,7 @@ public class ZKMetadataProvider { private static final String CLUSTER_APPLICATION_QUOTAS = "applicationQuotas"; private static final String PROPERTYSTORE_CONTROLLER_JOBS_PREFIX = "/CONTROLLER_JOBS"; private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS"; + private static final String PROPERTYSTORE_PAUSELESS_DEBUG_METADATA_PREFIX = "/PAUSELESS_DEBUG_METADATA"; private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS"; private static final String PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX = "/INSTANCE_PARTITIONS"; private static final String PROPERTYSTORE_DATABASE_CONFIGS_PREFIX = "/CONFIGS/DATABASE"; @@ -246,6 +247,10 @@ public class ZKMetadataProvider { return StringUtil.join("/", PROPERTYSTORE_SEGMENTS_PREFIX, resourceName, segmentName); } + public static String constructPropertyStorePathForPauselessDebugMetadata(String resourceName) { + return StringUtil.join("/", PROPERTYSTORE_PAUSELESS_DEBUG_METADATA_PREFIX, resourceName); + } + public static String constructPropertyStorePathForSchema(String schemaName) { return StringUtil.join("/", PROPERTYSTORE_SCHEMAS_PREFIX, schemaName); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index e69de66acb..ab9cfe3035 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -19,6 +19,7 @@ package org.apache.pinot.controller.api.resources; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; import io.swagger.annotations.Api; import io.swagger.annotations.ApiKeyAuthDefinition; import io.swagger.annotations.ApiOperation; @@ -29,6 +30,7 @@ import io.swagger.annotations.Authorization; import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -46,6 +48,7 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.utils.DatabaseUtils; @@ -318,6 +321,62 @@ public class PinotRealtimeTableResource { } } + @GET + @Path("/tables/{tableName}/pauselessDebugInfo") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_DEBUG_INFO) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Returns state of pauseless table", notes = + "Gets the segments that are in error state and segments with COMMITTING status in ZK metadata") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), + @ApiResponse(code = 404, message = "Table not found"), + @ApiResponse(code = 500, message = "Internal server error") + }) + public String getPauslessTableDebugInfo( + @ApiParam(value = "Realtime table name with or without type", required = true, example = "myTable | " + + "myTable_REALTIME") @PathParam("tableName") String realtimeTableName, + @Context HttpHeaders headers) { + realtimeTableName = DatabaseUtils.translateTableName(realtimeTableName, headers); + try { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName); + if (TableType.OFFLINE == tableType) { + throw new IllegalStateException("Cannot get consuming segments info for OFFLINE table: " + realtimeTableName); + } + + String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName); + + Map<String, Object> result = new HashMap<>(); + + result.put("instanceToErrorSegmentsMap", getInstanceToErrorSegmentsMap(tableNameWithType)); + + result.put("committingSegments", _pinotLLCRealtimeSegmentManager.getCommittingSegments(tableNameWithType)); + + return JsonUtils.objectToPrettyString(result); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to get pauseless debug info for table %s. %s", realtimeTableName, e.getMessage()), + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + private Map<String, Set<String>> getInstanceToErrorSegmentsMap(String tableNameWithType) { + ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "External view does not exist for table: " + tableNameWithType); + + Map<String, Set<String>> instanceToErrorSegmentsMap = new HashMap<>(); + + for (String segmentName : externalView.getPartitionSet()) { + Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); + for (String instance : externalViewStateMap.keySet()) { + if (CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals( + externalViewStateMap.get(instance))) { + instanceToErrorSegmentsMap.computeIfAbsent(instance, unused -> new HashSet<>()).add(segmentName); + } + } + } + return instanceToErrorSegmentsMap; + } + private void validateTable(String tableNameWithType) { IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType); if (idealState == null) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index e296136975..322401e649 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -36,10 +36,12 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Queue; import java.util.Random; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -57,6 +59,7 @@ import org.apache.helix.InstanceType; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.assignment.InstancePartitionsUtils; import org.apache.pinot.common.messages.ForceCommitMessage; @@ -124,6 +127,7 @@ import org.apache.pinot.spi.utils.retry.AttemptFailureException; import org.apache.pinot.spi.utils.retry.RetryPolicies; import org.apache.pinot.spi.utils.retry.RetryPolicy; import org.apache.zookeeper.data.Stat; +import org.codehaus.commons.nullanalysis.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -152,6 +156,8 @@ public class PinotLLCRealtimeSegmentManager { public static final String PAUSE_STATE = "pauseState"; private static final Logger LOGGER = LoggerFactory.getLogger(PinotLLCRealtimeSegmentManager.class); + private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f); + public static final String COMMITTING_SEGMENTS = "committingSegments"; private static final int STARTING_SEQUENCE_NUMBER = 0; // Initial sequence number for new table segments private static final String METADATA_EVENT_NOTIFIER_PREFIX = "metadata.event.notifier"; @@ -437,6 +443,66 @@ public class PinotLLCRealtimeSegmentManager { } } + private boolean addSegmentToCommittingSegmentsList(String realtimeTableName, String segmentName) { + String committingSegmentsListPath = + ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); + Stat stat = new Stat(); + ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, AccessOption.PERSISTENT); + int expectedVersion = stat.getVersion(); + LOGGER.info("Committing segments list size: {} before adding the segment: {}", Optional.ofNullable(znRecord) + .map(record -> record.getListField(COMMITTING_SEGMENTS)) + .map(List::size) + .orElse(0), segmentName); + + // empty ZN record for the table + if (znRecord == null) { + znRecord = new ZNRecord(realtimeTableName); + znRecord.setListField(COMMITTING_SEGMENTS, List.of(segmentName)); + return _propertyStore.create(committingSegmentsListPath, znRecord, AccessOption.PERSISTENT); + } + + // segment already present in the list + List<String> committingSegmentList = znRecord.getListField(COMMITTING_SEGMENTS); + if (committingSegmentList != null && committingSegmentList.contains(segmentName)) { + return true; + } + + if (committingSegmentList == null) { + committingSegmentList = List.of(segmentName); + } else { + committingSegmentList.add(segmentName); + } + znRecord.setListField(COMMITTING_SEGMENTS, committingSegmentList); + try { + return _propertyStore.set(committingSegmentsListPath, znRecord, expectedVersion, AccessOption.PERSISTENT); + } catch (ZkBadVersionException e) { + return false; + } + } + + private boolean removeSegmentFromCommittingSegmentsList(String realtimeTableName, String segmentName) { + String committingSegmentsListPath = + ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); + Stat stat = new Stat(); + ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, AccessOption.PERSISTENT); + + LOGGER.info("Committing segments list size: {} before removing the segment: {}", Optional.ofNullable(znRecord) + .map(record -> record.getListField(COMMITTING_SEGMENTS)) + .map(List::size) + .orElse(0), segmentName); + + if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) == null || !znRecord.getListField( + COMMITTING_SEGMENTS).contains(segmentName)) { + return true; + } + znRecord.getListField(COMMITTING_SEGMENTS).remove(segmentName); + try { + return _propertyStore.set(committingSegmentsListPath, znRecord, stat.getVersion(), AccessOption.PERSISTENT); + } catch (ZkBadVersionException e) { + return false; + } + } + public IdealState getIdealState(String realtimeTableName) { try { IdealState idealState = HelixHelper.getTableIdealState(_helixManager, realtimeTableName); @@ -611,6 +677,18 @@ public class PinotLLCRealtimeSegmentManager { // No-op } + private boolean updateCommittingSegmentsList(String realtimeTableName, + Callable<Boolean> operation) { + try { + DEFAULT_RETRY_POLICY.attempt(operation); + } catch (Exception e) { + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L); + LOGGER.error("Failed to update committing segments list for table: {}", realtimeTableName, e); + return false; + } + return true; + } + // Step 1: Update committing segment metadata private SegmentZKMetadata updateCommittingSegmentMetadata(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor, boolean isStartMetadata) { @@ -681,9 +759,10 @@ public class PinotLLCRealtimeSegmentManager { * the response is sent to the server to build the segment. * <p> * This method performs the following actions: - * 1. Updates the property store segment metadata status from IN_PROGRESS to COMMITTING. - * 2. Creates a new property store record for the next consuming segment. - * 3. Updates the ideal state to mark the new segment as CONSUMING. + * 1. Adds the segment to the committing segment list + * 2. Updates the property store segment metadata status from IN_PROGRESS to COMMITTING. + * 3. Creates a new property store record for the next consuming segment. + * 4. Updates the ideal state to mark the new segment as CONSUMING. */ public void commitSegmentStartMetadata(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) { @@ -693,6 +772,13 @@ public class PinotLLCRealtimeSegmentManager { try { _numCompletingSegments.addAndGet(1); + LOGGER.info("Adding segment: {} to committing segment list", + committingSegmentDescriptor.getSegmentName()); + if (!updateCommittingSegmentsList(realtimeTableName, + () -> addSegmentToCommittingSegmentsList(realtimeTableName, committingSegmentDescriptor.getSegmentName()))) { + LOGGER.error("Failed to update committing segments list for table: {}, segment: {}", realtimeTableName, + committingSegmentDescriptor.getSegmentName()); + } commitSegmentMetadataInternal(realtimeTableName, committingSegmentDescriptor, true); } finally { _numCompletingSegments.addAndGet(-1); @@ -701,7 +787,8 @@ public class PinotLLCRealtimeSegmentManager { /** * Invoked after the realtime segment has been built and uploaded during pauseless ingestion. - * Updates the metadata like CRC, download URL, etc. in the Zookeeper metadata for the committing segment. + * Updates the metadata like CRC, download URL, etc. in the Zookeeper metadata for the committing segment + * and removes the segment from the committing segment list. */ public void commitSegmentEndMetadata(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) { @@ -739,6 +826,14 @@ public class PinotLLCRealtimeSegmentManager { LOGGER.info("Updating segment ZK metadata for segment: {}", committingSegmentName); updateCommittingSegmentMetadata(realtimeTableName, committingSegmentDescriptor, false); LOGGER.info("Successfully updated segment metadata for segment: {}", committingSegmentName); + // remove the segment from the committing segment list + LOGGER.info("Removing segment: {} from committing segment list", + committingSegmentDescriptor.getSegmentName()); + if (!updateCommittingSegmentsList(realtimeTableName, + () -> removeSegmentFromCommittingSegmentsList(realtimeTableName, committingSegmentName))) { + LOGGER.error("Failed to update committing segments list for table: {}, segment: {}", realtimeTableName, + committingSegmentDescriptor.getSegmentName()); + } } finally { _numCompletingSegments.addAndGet(-1); } @@ -968,7 +1063,7 @@ public class PinotLLCRealtimeSegmentManager { instanceName); } return idealState; - }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true); + }, DEFAULT_RETRY_POLICY, true); } catch (Exception e) { _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L); throw e; @@ -1086,7 +1181,7 @@ public class PinotLLCRealtimeSegmentManager { realtimeTableName, isTableEnabled, isTablePaused); return idealState; } - }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true); + }, DEFAULT_RETRY_POLICY, true); } /** @@ -1116,7 +1211,7 @@ public class PinotLLCRealtimeSegmentManager { updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName, isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap); return idealState; - }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f)); + }, DEFAULT_RETRY_POLICY); } public static boolean isTablePaused(IdealState idealState) { @@ -2228,4 +2323,104 @@ public class PinotLLCRealtimeSegmentManager { } return segmentsYetToBeCommitted; } + + /** + * Synchronizes the list of committing segments for a realtime table in ZooKeeper by both adding new segments + * and removing segments that are no longer in COMMITTING state. This function is designed to be called periodically + * to maintain an up-to-date list of actively committing segments. + * + * The synchronization process works as follows: + * 1. For a new table (no existing ZooKeeper record), creates a fresh list with the provided segments + * 2. For an existing table, merges the new segments with currently committing segments while removing any + * segments that are no longer in COMMITTING state + * 3. Maintains uniqueness of segments using a Set-based deduplication + * + * @param realtimeTableName Name of the realtime table whose committing segments list needs to be synchronized + * @param committingSegments List of new segment names that are currently in COMMITTING state. + * If null, returns true without making any changes to the existing list + * @return true if the synchronization succeeds, false if there's a failure in updating ZooKeeper + * @see #getCommittingSegments for the logic that filters out segments no longer in COMMITTING state + */ + public boolean syncCommittingSegments(String realtimeTableName, @NotNull List<String> committingSegments) { + return updateCommittingSegmentsList(realtimeTableName, () -> { + String committingSegmentsListPath = + ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); + + // Fetch the committing segments record from the property store. + Stat stat = new Stat(); + ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, AccessOption.PERSISTENT); + + // empty ZN record for the table + if (znRecord == null) { + znRecord = new ZNRecord(realtimeTableName); + znRecord.setListField(COMMITTING_SEGMENTS, committingSegments); + return _propertyStore.create(committingSegmentsListPath, znRecord, AccessOption.PERSISTENT); + } + + Set<String> mergedSegments = new HashSet<>(committingSegments); + // Get segments that are present in the list and are still in COMMITTING status + List<String> existingSegments = + getCommittingSegments(realtimeTableName, znRecord.getListField(COMMITTING_SEGMENTS)); + if (existingSegments != null) { + mergedSegments.addAll(existingSegments); + } + + znRecord.setListField(COMMITTING_SEGMENTS, new ArrayList<>(mergedSegments)); + return _propertyStore.set(committingSegmentsListPath, znRecord, stat.getVersion(), AccessOption.PERSISTENT); + }); + } + + /** + * Filters and returns a list of committing segments for a realtime table. + * This method excludes segments that are either: + * 1. Missing from ZK metadata (likely deleted) + * 2. Already committed (status: DONE) + * + * @param realtimeTableName The name of the realtime table + * @param committingSegmentsFromPropertyStore List of segments from property store, can be null + * @return Filtered list of committing segments, or null if input is null + */ + @Nullable + private List<String> getCommittingSegments(String realtimeTableName, + @Nullable List<String> committingSegmentsFromPropertyStore) { + + if (committingSegmentsFromPropertyStore == null) { + return null; + } + + List<String> committingSegments = new ArrayList<>(); + for (String segment : committingSegmentsFromPropertyStore) { + SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(realtimeTableName, segment); + if (segmentZKMetadata == null || Status.DONE.equals(segmentZKMetadata.getStatus())) { + continue; + } + committingSegments.add(segment); + } + return committingSegments; + } + + /** + * Retrieves and filters the list of committing segments for a realtime table from the property store. + * This method: + * 1. Constructs the ZK path for pauseless debug metadata + * 2. Fetches the committing segments record from the property store + * 3. Filters out segments that are either deleted or already committed + * + * @param realtimeTableName The name of the realtime table to fetch committing segments for + * @return Filtered list of committing segments, or null if no committing segments record exists + * or if the COMMITTING_SEGMENTS field is not present in the ZNRecord + */ + public List<String> getCommittingSegments(String realtimeTableName) { + String committingSegmentsListPath = + ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); + + // Fetch the committing segments record from the property store. + Stat stat = new Stat(); + ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, AccessOption.PERSISTENT); + if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) == null) { + return null; + } + + return getCommittingSegments(realtimeTableName, znRecord.getListField(COMMITTING_SEGMENTS)); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 5ea86bd9fb..db832aede6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -20,6 +20,7 @@ package org.apache.pinot.controller.validation; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -28,6 +29,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.metrics.ValidationMetrics; +import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.api.resources.PauseStatusDetails; @@ -38,6 +40,7 @@ import org.apache.pinot.spi.config.table.PauseState; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; @@ -176,6 +179,12 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea // Update the total document count gauge _validationMetrics.updateTotalDocumentCountGauge(realtimeTableName, computeTotalDocumentCount(segmentsZKMetadata)); + // Ensures all segments in COMMITTING state are properly tracked in ZooKeeper. + // Acts as a recovery mechanism for segments that may have failed to register during start of commit protocol. + if (PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) { + syncCommittingSegmentsFromMetadata(realtimeTableName, segmentsZKMetadata); + } + // Check missing segments and upload them to the deep store if (_llcRealtimeSegmentManager.isDeepStoreLLCSegmentUploadRetryEnabled()) { _llcRealtimeSegmentManager.uploadToDeepStoreIfMissing(tableConfig, segmentsZKMetadata); @@ -186,6 +195,20 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea } } + private void syncCommittingSegmentsFromMetadata(String realtimeTableName, + List<SegmentZKMetadata> segmentsZKMetadata) { + List<String> committingSegments = new ArrayList<>(); + for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) { + if (CommonConstants.Segment.Realtime.Status.COMMITTING.equals(segmentZKMetadata.getStatus())) { + committingSegments.add(segmentZKMetadata.getSegmentName()); + } + } + LOGGER.info("Adding committing segments to ZK: {}", committingSegments); + if (!_llcRealtimeSegmentManager.syncCommittingSegments(realtimeTableName, committingSegments)) { + LOGGER.error("Failed to add committing segments for table: {}", realtimeTableName); + } + } + @Override protected void nonLeaderCleanup(List<String> tableNamesWithType) { for (String tableNameWithType : tableNamesWithType) { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index fcbb5c4a68..fd1f9d67c6 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -44,6 +45,7 @@ import java.util.stream.IntStream; import javax.annotation.Nullable; import javax.ws.rs.core.Response; import org.apache.commons.io.FileUtils; +import org.apache.helix.AccessOption; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.model.ExternalView; @@ -53,6 +55,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse; @@ -94,6 +97,7 @@ import org.testng.annotations.Test; import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION; import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS; +import static org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager.COMMITTING_SEGMENTS; import static org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -1482,6 +1486,173 @@ public class PinotLLCRealtimeSegmentManagerTest { assert ImmutableSet.of("s2", "s4", "s5").equals(segmentsYetToBeCommitted); } + @Test + public void testGetCommittingSegments() + throws HttpErrorStatusException, IOException, URISyntaxException { + // mock the behavior for PinotHelixResourceManager + PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class); + HelixManager helixManager = mock(HelixManager.class); + HelixAdmin helixAdmin = mock(HelixAdmin.class); + ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore = + (ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class); + when(pinotHelixResourceManager.getHelixZkManager()).thenReturn(helixManager); + when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin); + when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME); + when(pinotHelixResourceManager.getPropertyStore()).thenReturn(zkHelixPropertyStore); + + // init fake PinotLLCRealtimeSegmentManager + ControllerConf controllerConfig = new ControllerConf(); + FakePinotLLCRealtimeSegmentManager segmentManager = + new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager, controllerConfig); + + // Test table name + String realtimeTableName = "githubEvents_2_REALTIME"; + + // Create test segments + List<String> testSegments = List.of( + "githubEvents_2__0__0__20250210T1142Z", + "githubEvents_2__0__1__20250210T1142Z", + "githubEvents_2__0__2__20250210T1142Z", + "githubEvents_2__0__3__20250210T1142Z" + ); + + // mock response of propertyStore + String committingSegmentsListPath = + ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); + + ZNRecord znRecord = new ZNRecord(realtimeTableName); + znRecord.setListField(COMMITTING_SEGMENTS, testSegments); + + when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), eq(AccessOption.PERSISTENT))) + .thenReturn(znRecord); + + // mock response for fetching segmentZKMetadata with different scenarios + // Segment 0: COMMITTING status + SegmentZKMetadata segmentZKMetadata0 = mock(SegmentZKMetadata.class); + when(segmentZKMetadata0.getStatus()).thenReturn(Status.COMMITTING); + when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, testSegments.get(0))) + .thenReturn(segmentZKMetadata0); + + // Segment 1: null metadata (deleted) + when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, testSegments.get(1))) + .thenReturn(null); + + // Segment 2: DONE status + SegmentZKMetadata segmentZKMetadata2 = mock(SegmentZKMetadata.class); + when(segmentZKMetadata2.getStatus()).thenReturn(Status.DONE); + when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, testSegments.get(2))) + .thenReturn(segmentZKMetadata2); + + // Segment 3: COMMITTING status + SegmentZKMetadata segmentZKMetadata3 = mock(SegmentZKMetadata.class); + when(segmentZKMetadata3.getStatus()).thenReturn(Status.COMMITTING); + when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, testSegments.get(3))) + .thenReturn(segmentZKMetadata3); + + // Execute test + List<String> result = segmentManager.getCommittingSegments(realtimeTableName); + + // Verify results + assertNotNull(result); + assertEquals(2, result.size()); + assertTrue(result.contains(testSegments.get(0))); // Should include COMMITTING segment + assertFalse(result.contains(testSegments.get(1))); // Should exclude null metadata segment + assertFalse(result.contains(testSegments.get(2))); // Should exclude DONE segment + assertTrue(result.contains(testSegments.get(3))); // Should include COMMITTING segment + + // Test null case + when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), eq(AccessOption.PERSISTENT))) + .thenReturn(null); + result = segmentManager.getCommittingSegments(realtimeTableName); + assertNull(result); + + // Test empty COMMITTING_SEGMENTS field + ZNRecord emptyRecord = new ZNRecord("CommittingSegments"); + when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), eq(AccessOption.PERSISTENT))) + .thenReturn(emptyRecord); + result = segmentManager.getCommittingSegments(realtimeTableName); + assertNull(result); + } + + @Test + public void testSyncCommittingSegments() throws Exception { + // Set up mocks for the resource management infrastructure + PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class); + HelixManager helixManager = mock(HelixManager.class); + HelixAdmin helixAdmin = mock(HelixAdmin.class); + ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore = + (ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class); + + // Configure basic mock behaviors + when(pinotHelixResourceManager.getHelixZkManager()).thenReturn(helixManager); + when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin); + when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME); + when(pinotHelixResourceManager.getPropertyStore()).thenReturn(zkHelixPropertyStore); + + // Initialize the segment manager + ControllerConf controllerConfig = new ControllerConf(); + FakePinotLLCRealtimeSegmentManager segmentManager = + new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager, controllerConfig); + + String realtimeTableName = "testTable_REALTIME"; + String committingSegmentsListPath = + ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); + + + // Create test segments with different states + String committingSegment1 = "testTable__0__0__20250210T1142Z"; + String committingSegment2 = "testTable__0__1__20250210T1142Z"; + String doneSegment = "testTable__0__2__20250210T1142Z"; + + // Set up segment metadata mocks + SegmentZKMetadata committingMetadata1 = mock(SegmentZKMetadata.class); + when(committingMetadata1.getStatus()).thenReturn(Status.COMMITTING); + + SegmentZKMetadata committingMetadata2 = mock(SegmentZKMetadata.class); + when(committingMetadata2.getStatus()).thenReturn(Status.COMMITTING); + + SegmentZKMetadata doneMetadata = mock(SegmentZKMetadata.class); + when(doneMetadata.getStatus()).thenReturn(Status.DONE); + + when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, committingSegment1)).thenReturn( + committingMetadata1); + when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, committingSegment2)).thenReturn( + committingMetadata2); + when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, doneSegment)).thenReturn(doneMetadata); + + // Test 1: Initial creation with mixed status segments + List<String> newSegments = Arrays.asList(committingSegment1, committingSegment2); + when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), eq(AccessOption.PERSISTENT))) + .thenReturn(null); + when(zkHelixPropertyStore.create(eq(committingSegmentsListPath), any(), eq(AccessOption.PERSISTENT))) + .thenReturn(true); + + assertTrue(segmentManager.syncCommittingSegments(realtimeTableName, newSegments)); + + // Test 2: Syncing with existing segments including DONE and missing metadata + ZNRecord existingRecord = new ZNRecord(realtimeTableName); + existingRecord.setListField(COMMITTING_SEGMENTS, + Arrays.asList(committingSegment2, doneSegment)); + + when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), eq(AccessOption.PERSISTENT))) + .thenReturn(existingRecord); + when(zkHelixPropertyStore.set(eq(committingSegmentsListPath), any(), anyInt(), eq(AccessOption.PERSISTENT))) + .thenReturn(true); + + // There should not be any duplicates and the doneSegment should be removed from the list + assertTrue(segmentManager.syncCommittingSegments(realtimeTableName, + Arrays.asList(committingSegment1, committingSegment2))); + assertEquals(new HashSet<>(existingRecord.getListField(COMMITTING_SEGMENTS)), + new HashSet<>(List.of(committingSegment1, committingSegment2))); + + + // Test 3: Error handling during ZooKeeper operations + when(zkHelixPropertyStore.set(eq(committingSegmentsListPath), any(), anyInt(), eq(AccessOption.PERSISTENT))) + .thenThrow(new RuntimeException("ZooKeeper operation failed")); + assertFalse(segmentManager.syncCommittingSegments(realtimeTableName, newSegments)); + } + + ////////////////////////////////////////////////////////////////////////////////// // Fake classes ///////////////////////////////////////////////////////////////////////////////// --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org