Jackie-Jiang commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1471662750
########## pinot-tools/src/main/java/org/apache/pinot/tools/RealTimeSlowConsumer.java: ########## @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.tools; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.controller.helix.ControllerRequestClient; +import org.apache.pinot.spi.stream.StreamDataServerStartable; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; +import org.apache.pinot.tools.admin.PinotAdministrator; +import org.apache.pinot.tools.admin.command.QuickstartRunner; +import org.apache.pinot.tools.utils.KafkaStarterUtils; + + +public class RealTimeSlowConsumer extends QuickStartBase { + private static final String DEFAULT_CONTROLLER_URL = "http://localhost:9000"; + + @Override + protected Map<String, String> getDefaultStreamTableDirectories() { + return ImmutableMap.<String, String>builder() Review Comment: The indentation is incorrect. Please apply [Pinot Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#set-up-ide) and reformat the changes ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -889,6 +892,39 @@ public Map<String, PartitionLagState> getPartitionToLagState( return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap); } + /** + * Checks if the stream partition is in a valid state. + * + * The type of checks is dependent on the stream type. An example is if the startOffset has expired due to + * retention configuration of the stream which may lead to missed data. + * + * @param startOffset The offset of the first message desired, inclusive + */ + private void validateStartOffset(StreamPartitionMsgOffset startOffset) { + if (_partitionMetadataProvider == null) { + createPartitionMetadataProvider("validateStartOffset"); + } + + try { + StreamPartitionMsgOffset streamSmallestOffset = _partitionMetadataProvider.fetchStreamPartitionOffset( + OffsetCriteria.SMALLEST_OFFSET_CRITERIA, + /*maxWaitTimeMs=*/5000 + ); Review Comment: Use `fetchEarliestStreamOffset(5000)`? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1339,7 +1346,14 @@ private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffsetInSegmentZkMetadata, streamSmallestOffset, partitionGroupId, tableName); _controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); - return streamSmallestOffset; + String message = "Data lost from offset: " + startOffsetInSegmentZkMetadata + + " to: " + streamSmallestOffset + + " for partition: " + partitionGroupId + + " of table: " + tableName; + + _errorCache.put(Pair.of(tableName, segmentName), + new SegmentErrorInfo(String.valueOf(System.currentTimeMillis()), message, "")); Review Comment: ```suggestion new SegmentErrorInfo(System.currentTimeMillis()), message, null)); ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1711,4 +1725,14 @@ String moveSegmentFile(String rawTableName, String segmentName, String segmentLo URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + + public Map<String, SegmentErrorInfo> getSegmentErrors(String tableNameWithType) { Review Comment: Why not storing a 2 level map: `Map<String, Map<String, SegmentErrorInfo>>` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -140,6 +142,8 @@ public class PinotLLCRealtimeSegmentManager { // Max time to wait for all LLC segments to complete committing their metadata while stopping the controller. private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L; + private Map<Pair<String, String>, SegmentErrorInfo> _errorCache; Review Comment: This is not really a cache because we will never clean up the entries as of now. Do you plan to add the cleanup logic in this PR? If not, let's add a TODO at least ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -889,6 +892,39 @@ public Map<String, PartitionLagState> getPartitionToLagState( return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap); } + /** + * Checks if the stream partition is in a valid state. + * + * The type of checks is dependent on the stream type. An example is if the startOffset has expired due to + * retention configuration of the stream which may lead to missed data. + * + * @param startOffset The offset of the first message desired, inclusive + */ + private void validateStartOffset(StreamPartitionMsgOffset startOffset) { + if (_partitionMetadataProvider == null) { + createPartitionMetadataProvider("validateStartOffset"); + } + + try { + StreamPartitionMsgOffset streamSmallestOffset = _partitionMetadataProvider.fetchStreamPartitionOffset( + OffsetCriteria.SMALLEST_OFFSET_CRITERIA, + /*maxWaitTimeMs=*/5000 + ); + if (streamSmallestOffset.compareTo(startOffset) > 0) { + _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 1L); + String message = "startOffset(" + startOffset + + ") is older than topic's beginning offset(" + streamSmallestOffset + ")"; + _segmentLogger.error(message); + _realtimeTableDataManager.addSegmentError(_segmentNameStr, + new SegmentErrorInfo(String.valueOf(now()), message, "") + ); + } + } catch (TimeoutException tce) { Review Comment: We should catch all exceptions to avoid this check breaking the ingestion (Kinesis doesn't support this method at all). You may use `fetchEarliestStreamOffset()` which already handles this ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -889,6 +892,39 @@ public Map<String, PartitionLagState> getPartitionToLagState( return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap); } + /** + * Checks if the stream partition is in a valid state. + * + * The type of checks is dependent on the stream type. An example is if the startOffset has expired due to + * retention configuration of the stream which may lead to missed data. + * + * @param startOffset The offset of the first message desired, inclusive + */ + private void validateStartOffset(StreamPartitionMsgOffset startOffset) { + if (_partitionMetadataProvider == null) { + createPartitionMetadataProvider("validateStartOffset"); + } + + try { + StreamPartitionMsgOffset streamSmallestOffset = _partitionMetadataProvider.fetchStreamPartitionOffset( + OffsetCriteria.SMALLEST_OFFSET_CRITERIA, + /*maxWaitTimeMs=*/5000 + ); + if (streamSmallestOffset.compareTo(startOffset) > 0) { + _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 1L); + String message = "startOffset(" + startOffset + + ") is older than topic's beginning offset(" + streamSmallestOffset + ")"; + _segmentLogger.error(message); + _realtimeTableDataManager.addSegmentError(_segmentNameStr, + new SegmentErrorInfo(String.valueOf(now()), message, "") Review Comment: ```suggestion new SegmentErrorInfo(now()), message, null) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org