Jackie-Jiang commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940355486
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -442,6 +446,10 @@ public IdealState getIdealState(String realtimeTableName) { } } + public ExternalView getExternalView(String realtimeTableName) { + return _helixResourceManager.getTableExternalView(realtimeTableName); + } + Review Comment: (minor) Let's inline this and remove this method. The behavior is not consistent with `getIdealState()` which can cause confusion ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + /** + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reIngestSegment API + * on one of the alive servers that are supposed to host that segment according to IdealState. + * + * API signature: + * POST http://[serverURL]/reIngestSegment + * Request body (JSON): + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + * + * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME" + */ + public void reIngestSegmentsWithErrorState(String tableNameWithType) { Review Comment: (minor) When the table type is known, let's directly use the type for readability ```suggestion public void reIngestSegmentsWithErrorState(String realtimeTableName) { ``` ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -1243,7 +1243,7 @@ protected boolean buildSegmentAndReplace() return true; } - private void closeStreamConsumers() { + protected void closeStreamConsumers() { Review Comment: (minor) Seems not needed, or do you plan to add a test? ########## pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java: ########## @@ -1274,6 +1275,39 @@ public File downloadUntarFileStreamed(URI uri, File dest, AuthProvider authProvi httpHeaders, maxStreamRateInByte); } + /** + * Invokes the server's reIngestSegment API via a POST request with JSON payload, + * using Simple HTTP APIs. + * + * POST http://[serverURL]/reIngestSegment Review Comment: Suggest just making it `POST /reIngestSegment/{segmentName}` which is easier to use. ########## pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java: ########## @@ -1274,6 +1275,39 @@ public File downloadUntarFileStreamed(URI uri, File dest, AuthProvider authProvi httpHeaders, maxStreamRateInByte); } + /** + * Invokes the server's reIngestSegment API via a POST request with JSON payload, + * using Simple HTTP APIs. + * + * POST http://[serverURL]/reIngestSegment + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + */ + public void triggerReIngestion(String serverHostPort, String tableNameWithType, String segmentName) Review Comment: (minor) table name is implicit from the LLC segment name, and we may simplify this API ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + /** + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reIngestSegment API + * on one of the alive servers that are supposed to host that segment according to IdealState. + * + * API signature: + * POST http://[serverURL]/reIngestSegment + * Request body (JSON): + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + * + * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME" + */ + public void reIngestSegmentsWithErrorState(String tableNameWithType) { + // Step 1: Fetch the ExternalView and all segments + ExternalView externalView = getExternalView(tableNameWithType); + IdealState idealState = getIdealState(tableNameWithType); + Map<String, Map<String, String>> segmentToInstanceCurrentStateMap = externalView.getRecord().getMapFields(); + Map<String, Map<String, String>> segmentToInstanceIdealStateMap = idealState.getRecord().getMapFields(); + + // find segments in ERROR state in externalView + List<String> segmentsInErrorState = new ArrayList<>(); + for (Map.Entry<String, Map<String, String>> entry : segmentToInstanceCurrentStateMap.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> instanceStateMap = entry.getValue(); + boolean allReplicasInError = true; + for (String state : instanceStateMap.values()) { + if (!SegmentStateModel.ERROR.equals(state)) { + allReplicasInError = false; + break; + } + } + if (allReplicasInError) { + segmentsInErrorState.add(segmentName); + } + } + + if (segmentsInErrorState.isEmpty()) { + LOGGER.info("No segments found in ERROR state for table {}", tableNameWithType); + return; + } + + // filter out segments that are not ONLINE in IdealState + for (String segmentName : segmentsInErrorState) { + Map<String, String> instanceIdealStateMap = segmentToInstanceIdealStateMap.get(segmentName); + boolean isOnline = true; + for (String state : instanceIdealStateMap.values()) { + if (!SegmentStateModel.ONLINE.equals(state)) { + isOnline = false; + break; + } + } + if (!isOnline) { + segmentsInErrorState.remove(segmentName); Review Comment: Can you modify while looping over the list? I believe this will throw exception ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + /** + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reIngestSegment API + * on one of the alive servers that are supposed to host that segment according to IdealState. + * + * API signature: + * POST http://[serverURL]/reIngestSegment + * Request body (JSON): + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + * + * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME" + */ + public void reIngestSegmentsWithErrorState(String tableNameWithType) { + // Step 1: Fetch the ExternalView and all segments + ExternalView externalView = getExternalView(tableNameWithType); + IdealState idealState = getIdealState(tableNameWithType); + Map<String, Map<String, String>> segmentToInstanceCurrentStateMap = externalView.getRecord().getMapFields(); + Map<String, Map<String, String>> segmentToInstanceIdealStateMap = idealState.getRecord().getMapFields(); + + // find segments in ERROR state in externalView + List<String> segmentsInErrorState = new ArrayList<>(); + for (Map.Entry<String, Map<String, String>> entry : segmentToInstanceCurrentStateMap.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> instanceStateMap = entry.getValue(); + boolean allReplicasInError = true; + for (String state : instanceStateMap.values()) { + if (!SegmentStateModel.ERROR.equals(state)) { + allReplicasInError = false; + break; + } + } + if (allReplicasInError) { + segmentsInErrorState.add(segmentName); + } + } + + if (segmentsInErrorState.isEmpty()) { + LOGGER.info("No segments found in ERROR state for table {}", tableNameWithType); + return; + } + + // filter out segments that are not ONLINE in IdealState + for (String segmentName : segmentsInErrorState) { + Map<String, String> instanceIdealStateMap = segmentToInstanceIdealStateMap.get(segmentName); + boolean isOnline = true; + for (String state : instanceIdealStateMap.values()) { + if (!SegmentStateModel.ONLINE.equals(state)) { + isOnline = false; + break; + } + } + if (!isOnline) { + segmentsInErrorState.remove(segmentName); + } + } + + // Step 2: For each segment, check the ZK metadata for conditions + for (String segmentName : segmentsInErrorState) { + // Skip non-LLC segments or segments missing from the ideal state altogether + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); Review Comment: (minor) I'd check this before checking ERROR segment because this one is cheaper ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + /** + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reIngestSegment API + * on one of the alive servers that are supposed to host that segment according to IdealState. + * + * API signature: + * POST http://[serverURL]/reIngestSegment + * Request body (JSON): + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + * + * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME" + */ + public void reIngestSegmentsWithErrorState(String tableNameWithType) { + // Step 1: Fetch the ExternalView and all segments + ExternalView externalView = getExternalView(tableNameWithType); + IdealState idealState = getIdealState(tableNameWithType); + Map<String, Map<String, String>> segmentToInstanceCurrentStateMap = externalView.getRecord().getMapFields(); + Map<String, Map<String, String>> segmentToInstanceIdealStateMap = idealState.getRecord().getMapFields(); + + // find segments in ERROR state in externalView + List<String> segmentsInErrorState = new ArrayList<>(); + for (Map.Entry<String, Map<String, String>> entry : segmentToInstanceCurrentStateMap.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> instanceStateMap = entry.getValue(); + boolean allReplicasInError = true; Review Comment: When there are `ONLINE` replica, ideally we should reset the `ERROR` replica. Do we rely on validation manager for that? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + /** + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reIngestSegment API + * on one of the alive servers that are supposed to host that segment according to IdealState. + * + * API signature: + * POST http://[serverURL]/reIngestSegment + * Request body (JSON): + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + * + * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME" + */ + public void reIngestSegmentsWithErrorState(String tableNameWithType) { + // Step 1: Fetch the ExternalView and all segments + ExternalView externalView = getExternalView(tableNameWithType); + IdealState idealState = getIdealState(tableNameWithType); + Map<String, Map<String, String>> segmentToInstanceCurrentStateMap = externalView.getRecord().getMapFields(); + Map<String, Map<String, String>> segmentToInstanceIdealStateMap = idealState.getRecord().getMapFields(); + + // find segments in ERROR state in externalView + List<String> segmentsInErrorState = new ArrayList<>(); + for (Map.Entry<String, Map<String, String>> entry : segmentToInstanceCurrentStateMap.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> instanceStateMap = entry.getValue(); + boolean allReplicasInError = true; + for (String state : instanceStateMap.values()) { + if (!SegmentStateModel.ERROR.equals(state)) { + allReplicasInError = false; + break; + } + } + if (allReplicasInError) { + segmentsInErrorState.add(segmentName); + } + } + + if (segmentsInErrorState.isEmpty()) { + LOGGER.info("No segments found in ERROR state for table {}", tableNameWithType); + return; + } + + // filter out segments that are not ONLINE in IdealState + for (String segmentName : segmentsInErrorState) { + Map<String, String> instanceIdealStateMap = segmentToInstanceIdealStateMap.get(segmentName); + boolean isOnline = true; + for (String state : instanceIdealStateMap.values()) { + if (!SegmentStateModel.ONLINE.equals(state)) { + isOnline = false; + break; + } + } + if (!isOnline) { + segmentsInErrorState.remove(segmentName); + } + } + + // Step 2: For each segment, check the ZK metadata for conditions + for (String segmentName : segmentsInErrorState) { + // Skip non-LLC segments or segments missing from the ideal state altogether + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + if (llcSegmentName == null) { + continue; + } + + SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName); + // We only consider segments that are in COMMITTING state + if (segmentZKMetadata.getStatus() == Status.COMMITTING) { + Map<String, String> instanceStateMap = segmentToInstanceIdealStateMap.get(segmentName); + + // Step 3: “No peer has that segment.” => Re-ingest from one server that is supposed to host it and is alive + LOGGER.info( + "Segment {} in table {} is COMMITTING with missing download URL and no peer copy. Triggering re-ingestion.", + segmentName, tableNameWithType); + + // Find at least one server that should host this segment and is alive + String aliveServer = findAliveServerToReIngest(instanceStateMap.keySet()); + if (aliveServer == null) { + LOGGER.warn("No alive server found to re-ingest segment {} in table {}", segmentName, tableNameWithType); + continue; + } + + try { + _fileUploadDownloadClient.triggerReIngestion(aliveServer, tableNameWithType, segmentName); Review Comment: This doesn't belong to upload/download category. Can you add this method to a different class? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + /** + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reIngestSegment API + * on one of the alive servers that are supposed to host that segment according to IdealState. + * + * API signature: + * POST http://[serverURL]/reIngestSegment + * Request body (JSON): + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + * + * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME" + */ + public void reIngestSegmentsWithErrorState(String tableNameWithType) { + // Step 1: Fetch the ExternalView and all segments + ExternalView externalView = getExternalView(tableNameWithType); Review Comment: `externalView` could be `null`. We need null check then return ########## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ########## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { Review Comment: Can we move this class to `segment-local` so that it can be reused by minion? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + /** + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reIngestSegment API + * on one of the alive servers that are supposed to host that segment according to IdealState. + * + * API signature: + * POST http://[serverURL]/reIngestSegment + * Request body (JSON): + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + * + * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME" + */ + public void reIngestSegmentsWithErrorState(String tableNameWithType) { + // Step 1: Fetch the ExternalView and all segments + ExternalView externalView = getExternalView(tableNameWithType); + IdealState idealState = getIdealState(tableNameWithType); + Map<String, Map<String, String>> segmentToInstanceCurrentStateMap = externalView.getRecord().getMapFields(); + Map<String, Map<String, String>> segmentToInstanceIdealStateMap = idealState.getRecord().getMapFields(); + + // find segments in ERROR state in externalView + List<String> segmentsInErrorState = new ArrayList<>(); + for (Map.Entry<String, Map<String, String>> entry : segmentToInstanceCurrentStateMap.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> instanceStateMap = entry.getValue(); + boolean allReplicasInError = true; + for (String state : instanceStateMap.values()) { + if (!SegmentStateModel.ERROR.equals(state)) { + allReplicasInError = false; + break; + } + } + if (allReplicasInError) { + segmentsInErrorState.add(segmentName); + } + } + + if (segmentsInErrorState.isEmpty()) { + LOGGER.info("No segments found in ERROR state for table {}", tableNameWithType); + return; + } + + // filter out segments that are not ONLINE in IdealState + for (String segmentName : segmentsInErrorState) { + Map<String, String> instanceIdealStateMap = segmentToInstanceIdealStateMap.get(segmentName); + boolean isOnline = true; + for (String state : instanceIdealStateMap.values()) { + if (!SegmentStateModel.ONLINE.equals(state)) { + isOnline = false; + break; + } + } + if (!isOnline) { + segmentsInErrorState.remove(segmentName); + } + } + + // Step 2: For each segment, check the ZK metadata for conditions + for (String segmentName : segmentsInErrorState) { + // Skip non-LLC segments or segments missing from the ideal state altogether + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + if (llcSegmentName == null) { + continue; + } + + SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName); + // We only consider segments that are in COMMITTING state + if (segmentZKMetadata.getStatus() == Status.COMMITTING) { + Map<String, String> instanceStateMap = segmentToInstanceIdealStateMap.get(segmentName); + + // Step 3: “No peer has that segment.” => Re-ingest from one server that is supposed to host it and is alive + LOGGER.info( + "Segment {} in table {} is COMMITTING with missing download URL and no peer copy. Triggering re-ingestion.", + segmentName, tableNameWithType); + + // Find at least one server that should host this segment and is alive + String aliveServer = findAliveServerToReIngest(instanceStateMap.keySet()); Review Comment: We shouldn't need this check. If a server is `ERROR` in EV, it should be live. We also want to randomly pick the server to avoid creating hotspot ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + /** + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reIngestSegment API + * on one of the alive servers that are supposed to host that segment according to IdealState. + * + * API signature: + * POST http://[serverURL]/reIngestSegment + * Request body (JSON): + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + * + * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME" + */ + public void reIngestSegmentsWithErrorState(String tableNameWithType) { + // Step 1: Fetch the ExternalView and all segments + ExternalView externalView = getExternalView(tableNameWithType); + IdealState idealState = getIdealState(tableNameWithType); + Map<String, Map<String, String>> segmentToInstanceCurrentStateMap = externalView.getRecord().getMapFields(); + Map<String, Map<String, String>> segmentToInstanceIdealStateMap = idealState.getRecord().getMapFields(); + + // find segments in ERROR state in externalView + List<String> segmentsInErrorState = new ArrayList<>(); + for (Map.Entry<String, Map<String, String>> entry : segmentToInstanceCurrentStateMap.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> instanceStateMap = entry.getValue(); + boolean allReplicasInError = true; + for (String state : instanceStateMap.values()) { + if (!SegmentStateModel.ERROR.equals(state)) { + allReplicasInError = false; + break; + } + } + if (allReplicasInError) { + segmentsInErrorState.add(segmentName); + } + } + + if (segmentsInErrorState.isEmpty()) { + LOGGER.info("No segments found in ERROR state for table {}", tableNameWithType); + return; + } + + // filter out segments that are not ONLINE in IdealState + for (String segmentName : segmentsInErrorState) { + Map<String, String> instanceIdealStateMap = segmentToInstanceIdealStateMap.get(segmentName); + boolean isOnline = true; + for (String state : instanceIdealStateMap.values()) { + if (!SegmentStateModel.ONLINE.equals(state)) { + isOnline = false; + break; + } + } + if (!isOnline) { + segmentsInErrorState.remove(segmentName); Review Comment: You may merge this loop and the next one, and simply `continue` here ########## pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java: ########## @@ -100,6 +100,14 @@ public AuthProvider getAuthProvider() { return _authProvider; } + public String getProtocol() { + return _protocol; + } + + public Integer getControllerHttpsPort() { + return _controllerHttpsPort; + } + Review Comment: The logic of handling upload re-ingested segment should be within this class. Can you move the logic? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -638,6 +637,16 @@ private long getDownloadTimeOutMilliseconds(@Nullable TableConfig tableConfig) { }).orElse(DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS); } + protected RealtimeSegmentDataManager createRealtimeSegmentDataManager(SegmentZKMetadata zkMetadata, Review Comment: (minor) Annotate `@VisibleForTesting` ########## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ########## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { Review Comment: I don't think we need a data manager here. We are not using it to serve query, so I'd suggest simplifying this class and only keep the ingestion related part ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java: ########## @@ -389,15 +390,21 @@ protected void replaceSegmentIfCrcMismatch(SegmentDataManager segmentDataManager IndexLoadingConfig indexLoadingConfig) throws Exception { String segmentName = segmentDataManager.getSegmentName(); - Preconditions.checkState(segmentDataManager instanceof ImmutableSegmentDataManager, - "Cannot replace CONSUMING segment: %s in table: %s", segmentName, _tableNameWithType); - SegmentMetadata localMetadata = segmentDataManager.getSegment().getSegmentMetadata(); - if (hasSameCRC(zkMetadata, localMetadata)) { - _logger.info("Segment: {} has CRC: {} same as before, not replacing it", segmentName, localMetadata.getCrc()); - return; + TableConfig tableConfig = indexLoadingConfig.getTableConfig(); + // For pauseless tables, we should replace the segment if download url is missing even if crc is same + // Without this the reingestion of ERROR segments in pauseless tables fails + // as the segment data manager is still an instance of RealtimeSegmentDataManager + if (!PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) { Review Comment: When will crc match? We shouldn't replace a segment multiple times (e.g. somehow 2 servers trying to re-ingest) ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/FailureInjectingRealtimeSegmentDataManager.java: ########## @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.realtime; + +import java.util.concurrent.Semaphore; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.apache.pinot.spi.utils.retry.RetriableOperationException; + + +/** + * A specialized RealtimeSegmentDataManager that lets us inject a forced failure + * in the commit step, which occurs strictly after the segmentConsumed message. + */ +public class FailureInjectingRealtimeSegmentDataManager extends RealtimeSegmentDataManager { Review Comment: (minor) Keep all failure injecting class in the same package? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org