krishan1390 commented on code in PR #15563:
URL: https://github.com/apache/pinot/pull/15563#discussion_r2052300098


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PauseStatusDetails.java:
##########
@@ -65,4 +66,11 @@ public String getComment() {
   public String getTimestamp() {
     return _timestamp;
   }
+
+  @Override
+  public String toString() {

Review Comment:
   added this for easier debuggability of tests to log the pause status details



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java:
##########
@@ -824,6 +831,86 @@ public void 
enableResourceConfigForLeadControllerResource(boolean enable) {
     }
   }
 
+  public void runRealtimeSegmentValidationTask(String tableName)
+      throws IOException {
+    runPeriodicTask("RealtimeSegmentValidationManager", tableName, "REALTIME");
+  }
+
+  public void runPeriodicTask(String taskName, String tableName, String 
tableType)
+      throws IOException {
+    sendGetRequest(getControllerRequestURLBuilder().forPeriodTaskRun(taskName, 
tableName, tableType));
+  }
+
+  public void pauseTable(String tableName)

Review Comment:
   these methods are shitfted from 
KafkaIncreaseDecreasePartitionsIntegrationTest to make it available for other 
tests. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1651,13 +1655,17 @@ private void createNewConsumingSegment(TableConfig 
tableConfig, StreamConfig str
   }
 
   private Map<Integer, StreamPartitionMsgOffset> 
fetchPartitionGroupIdToSmallestOffset(
-      List<StreamConfig> streamConfigs) {
+      List<StreamConfig> streamConfigs, IdealState idealState) {
     Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = 
new HashMap<>();
     for (StreamConfig streamConfig : streamConfigs) {
+
+      List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
+          getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+
       OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
       streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
       List<PartitionGroupMetadata> partitionGroupMetadataList =
-          getNewPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList());
+          getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList);

Review Comment:
   if we send empty list to getNewPartitionGroupMetadataList, the function 
assumes that no shards are currently being consumed. 
   
   for kinesis, we need to completely consume a parent shard before starting 
consumption for a child shard. 
   
   so if parent shard doesn't exist, the function doesn't return child shards 
because it asks us to consume parent shard first. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -994,15 +994,17 @@ Set<Integer> getPartitionIds(StreamConfig streamConfig)
   @VisibleForTesting
   Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState 
idealState) {
     Set<Integer> partitionIds = new HashSet<>();
-    boolean allPartitionIdsFetched = true;
+    boolean allPartitionIdsFetched = false;
     for (int i = 0; i < streamConfigs.size(); i++) {
       final int index = i;
       try {
         partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream()
             .map(partitionId -> 
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, 
index))
             .collect(Collectors.toSet()));
+        allPartitionIdsFetched = true;
+      } catch (UnsupportedOperationException ignored) {
+        // Stream does not support fetching partition ids. There is a log in 
the fallback code which is sufficient

Review Comment:
   Not logging the exception in such cases to avoid noise



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/RealtimeKinesisIntegrationTest.java:
##########
@@ -16,125 +16,65 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.integration.tests;
-
-import cloud.localstack.Localstack;
-import cloud.localstack.ServiceName;
-import cloud.localstack.docker.annotation.LocalstackDockerAnnotationProcessor;
-import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
-import cloud.localstack.docker.annotation.LocalstackDockerProperties;
-import cloud.localstack.docker.command.Command;
+package org.apache.pinot.integration.tests.realtime.ingestion;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.JsonNodeType;
 import com.google.common.base.Function;
 import java.io.BufferedReader;
-import java.io.File;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.Statement;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.activation.UnsupportedDataTypeException;
 import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.client.ResultSet;
-import org.apache.pinot.plugin.stream.kinesis.KinesisConfig;
-import org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.StringUtil;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.SkipException;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.SdkBytes;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
 import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-import software.amazon.awssdk.utils.AttributeMap;
 
 
-@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag = 
"2.3.2")
-public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSet {
+public class RealtimeKinesisIntegrationTest extends BaseKinesisIntegrationTest 
{

Review Comment:
   a lot of common part of the code wrt setting up kinesis has been moved to 
BaseKinesisIntegrationTest



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KafkaIncreaseDecreasePartitionsIntegrationTest.java:
##########
@@ -38,41 +36,6 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest 
extends BaseRealtime
   private static final String KAFKA_TOPIC = "meetup";
   private static final int NUM_PARTITIONS = 1;
 
-  String getExternalView(String tableName)

Review Comment:
   these methods have moved to ControllerTest 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to