krishan1390 commented on code in PR #15563: URL: https://github.com/apache/pinot/pull/15563#discussion_r2052300098
########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PauseStatusDetails.java: ########## @@ -65,4 +66,11 @@ public String getComment() { public String getTimestamp() { return _timestamp; } + + @Override + public String toString() { Review Comment: added this for easier debuggability of tests to log the pause status details ########## pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java: ########## @@ -824,6 +831,86 @@ public void enableResourceConfigForLeadControllerResource(boolean enable) { } } + public void runRealtimeSegmentValidationTask(String tableName) + throws IOException { + runPeriodicTask("RealtimeSegmentValidationManager", tableName, "REALTIME"); + } + + public void runPeriodicTask(String taskName, String tableName, String tableType) + throws IOException { + sendGetRequest(getControllerRequestURLBuilder().forPeriodTaskRun(taskName, tableName, tableType)); + } + + public void pauseTable(String tableName) Review Comment: these methods are shitfted from KafkaIncreaseDecreasePartitionsIntegrationTest to make it available for other tests. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1651,13 +1655,17 @@ private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig str } private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset( - List<StreamConfig> streamConfigs) { + List<StreamConfig> streamConfigs, IdealState idealState) { Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = new HashMap<>(); for (StreamConfig streamConfig : streamConfigs) { + + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = + getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); List<PartitionGroupMetadata> partitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList()); + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); Review Comment: if we send empty list to getNewPartitionGroupMetadataList, the function assumes that no shards are currently being consumed. for kinesis, we need to completely consume a parent shard before starting consumption for a child shard. so if parent shard doesn't exist, the function doesn't return child shards because it asks us to consume parent shard first. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -994,15 +994,17 @@ Set<Integer> getPartitionIds(StreamConfig streamConfig) @VisibleForTesting Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState idealState) { Set<Integer> partitionIds = new HashSet<>(); - boolean allPartitionIdsFetched = true; + boolean allPartitionIdsFetched = false; for (int i = 0; i < streamConfigs.size(); i++) { final int index = i; try { partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream() .map(partitionId -> IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, index)) .collect(Collectors.toSet())); + allPartitionIdsFetched = true; + } catch (UnsupportedOperationException ignored) { + // Stream does not support fetching partition ids. There is a log in the fallback code which is sufficient Review Comment: Not logging the exception in such cases to avoid noise ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/RealtimeKinesisIntegrationTest.java: ########## @@ -16,125 +16,65 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.integration.tests; - -import cloud.localstack.Localstack; -import cloud.localstack.ServiceName; -import cloud.localstack.docker.annotation.LocalstackDockerAnnotationProcessor; -import cloud.localstack.docker.annotation.LocalstackDockerConfiguration; -import cloud.localstack.docker.annotation.LocalstackDockerProperties; -import cloud.localstack.docker.command.Command; +package org.apache.pinot.integration.tests.realtime.ingestion; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeType; import com.google.common.base.Function; import java.io.BufferedReader; -import java.io.File; import java.io.InputStream; import java.io.InputStreamReader; -import java.net.URI; -import java.net.URL; import java.nio.charset.StandardCharsets; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import javax.activation.UnsupportedDataTypeException; import javax.annotation.Nullable; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.client.ResultSet; -import org.apache.pinot.plugin.stream.kinesis.KinesisConfig; -import org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.StringUtil; -import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.util.TestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; -import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.core.SdkBytes; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.apache.ApacheSdkHttpService; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisClient; -import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; -import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; -import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; -import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; -import software.amazon.awssdk.utils.AttributeMap; -@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag = "2.3.2") -public class RealtimeKinesisIntegrationTest extends BaseClusterIntegrationTestSet { +public class RealtimeKinesisIntegrationTest extends BaseKinesisIntegrationTest { Review Comment: a lot of common part of the code wrt setting up kinesis has been moved to BaseKinesisIntegrationTest ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KafkaIncreaseDecreasePartitionsIntegrationTest.java: ########## @@ -38,41 +36,6 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest extends BaseRealtime private static final String KAFKA_TOPIC = "meetup"; private static final int NUM_PARTITIONS = 1; - String getExternalView(String tableName) Review Comment: these methods have moved to ControllerTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org