This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3d46edb089 Kinesis partition split fixes (#15563) 3d46edb089 is described below commit 3d46edb089325860a4c1d1f005dfb2d74139539f Author: Krishan Goyal <kris...@startree.ai> AuthorDate: Tue Apr 29 13:47:14 2025 +0530 Kinesis partition split fixes (#15563) * Initial fixes to fix issues related to kinesis partition split * Refactor kinesis tests to make it easy to add more tests * Created a test case for shard increase and fixed bug related to end of consumption * Added more tests to test split / merge combinations with pause / resume / RVM triggers on a old / new table * Checkstlye fixes * Small refactors and attempting to see if we can consume from ZK offset always * Fix kafka regression with a workaround flag. * Add overrriden function for test case * Add more testing around largest offset and concurrent pause / resume functionality * Avoid overridding pulsar behaviour to continue with current behaviour for now * Improving some documentation * Address PR comments * Retry message fetch outside kinesis consumer * Checkstyle fixes --- .../helix/core/PinotTableIdealStateBuilder.java | 7 +- .../realtime/MissingConsumingSegmentFinder.java | 2 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 39 +- .../pinot/controller/helix/ControllerTest.java | 88 +++++ .../PinotLLCRealtimeSegmentManagerTest.java | 7 + .../realtime/RealtimeSegmentDataManager.java | 5 +- .../tests/BaseClusterIntegrationTest.java | 10 +- .../ingestion/BaseKinesisIntegrationTest.java | 236 ++++++++++++ ...aIncreaseDecreasePartitionsIntegrationTest.java | 67 +--- .../realtime/ingestion/KinesisShardChangeTest.java | 425 +++++++++++++++++++++ .../ingestion}/RealtimeKinesisIntegrationTest.java | 212 +--------- .../realtime/ingestion/utils/KinesisUtils.java | 112 ++++++ .../plugin/stream/kinesis/KinesisConsumer.java | 11 + .../kinesis/KinesisStreamMetadataProvider.java | 42 +- .../kinesis/KinesisStreamMetadataProviderTest.java | 32 ++ .../FreshnessBasedConsumptionStatusChecker.java | 12 +- .../IngestionBasedConsumptionStatusChecker.java | 21 + .../helix/OffsetBasedConsumptionStatusChecker.java | 21 +- .../spi/stream/PartitionGroupMetadataFetcher.java | 9 +- .../pinot/spi/stream/StreamMetadataProvider.java | 21 + .../utils/builder/ControllerRequestURLBuilder.java | 5 + 21 files changed, 1090 insertions(+), 294 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 8895d9df50..244f7853d8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -85,11 +85,12 @@ public class PinotTableIdealStateBuilder { * partition groups. * The size of this list is equal to the number of partition groups, * and is created using the latest segment zk metadata. + * @param forceGetOffsetFromStream - details in PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset */ public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(List<StreamConfig> streamConfigs, - List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { - PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = - new PartitionGroupMetadataFetcher(streamConfigs, partitionGroupConsumptionStatusList); + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList, boolean forceGetOffsetFromStream) { + PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = new PartitionGroupMetadataFetcher( + streamConfigs, partitionGroupConsumptionStatusList, forceGetOffsetFromStream); try { DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index 5fe2ffe6d6..efc43246b7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -81,7 +81,7 @@ public class MissingConsumingSegmentFinder { return streamConfig; }); try { - PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList()) + PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), false) .forEach(metadata -> { _partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); }); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index cead9ddfd4..765b25852a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1001,6 +1001,9 @@ public class PinotLLCRealtimeSegmentManager { partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream() .map(partitionId -> IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, index)) .collect(Collectors.toSet())); + } catch (UnsupportedOperationException ignored) { + allPartitionIdsFetched = false; + // Stream does not support fetching partition ids. There is a log in the fallback code which is sufficient } catch (Exception e) { allPartitionIdsFetched = false; LOGGER.warn("Failed to fetch partition ids for stream: {}", streamConfigs.get(i).getTopicName(), e); @@ -1035,7 +1038,20 @@ public class PinotLLCRealtimeSegmentManager { List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs, List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList) { return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, - currentPartitionGroupConsumptionStatusList); + currentPartitionGroupConsumptionStatusList, false); + } + + /** + * Fetches the latest state of the PartitionGroups for the stream + * If any partition has reached end of life, and all messages of that partition have been consumed by the segment, + * it will be skipped from the result + */ + @VisibleForTesting + List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs, + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList, + boolean forceGetOffsetFromStream) { + return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, + currentPartitionGroupConsumptionStatusList, forceGetOffsetFromStream); } /** @@ -1460,7 +1476,7 @@ public class PinotLLCRealtimeSegmentManager { } // Create a map from partition id to the smallest stream offset Map<Integer, StreamPartitionMsgOffset> partitionIdToSmallestOffset = null; - if (offsetCriteria == OffsetCriteria.SMALLEST_OFFSET_CRITERIA) { + if (offsetCriteria != null && offsetCriteria.equals(OffsetCriteria.SMALLEST_OFFSET_CRITERIA)) { partitionIdToSmallestOffset = partitionIdToStartOffset; } @@ -1553,11 +1569,13 @@ public class PinotLLCRealtimeSegmentManager { // Smallest offset is fetched from stream once and cached in partitionIdToSmallestOffset. if (partitionIdToSmallestOffset == null) { - partitionIdToSmallestOffset = fetchPartitionGroupIdToSmallestOffset(streamConfigs); + partitionIdToSmallestOffset = fetchPartitionGroupIdToSmallestOffset(streamConfigs, idealState); } // Do not create new CONSUMING segment when the stream partition has reached end of life. if (!partitionIdToSmallestOffset.containsKey(partitionId)) { + LOGGER.info("PartitionGroup: {} has reached end of life. Skipping creation of new segment {}", + partitionId, latestSegmentName); continue; } @@ -1651,13 +1669,24 @@ public class PinotLLCRealtimeSegmentManager { } private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset( - List<StreamConfig> streamConfigs) { + List<StreamConfig> streamConfigs, IdealState idealState) { Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = new HashMap<>(); for (StreamConfig streamConfig : streamConfigs) { + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = + getPartitionGroupConsumptionStatusList(idealState, streamConfigs); OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + + // Kinesis shard-split flow requires us to pass currentPartitionGroupConsumptionStatusList so that + // we can check if its completely consumed + // However the kafka implementation of computePartitionGroupMetadata() breaks if we pass the current status + // This leads to streamSmallestOffset set to null in selectStartOffset() method + // The overall dependency isn't clean and is causing the issue and requires refactor + // Temporarily, we are passing a boolean flag to indicate if we want to use the current status + // The kafka implementation of computePartitionGroupMetadata() will ignore the current status + // while the kinesis implementation will use it. List<PartitionGroupMetadata> partitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList()); + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, true); streamConfig.setOffsetCriteria(originalOffsetCriteria); for (PartitionGroupMetadata metadata : partitionGroupMetadataList) { partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index e92a185985..a5b05031c6 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; @@ -65,6 +66,8 @@ import org.apache.pinot.controller.BaseControllerStarter; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.ControllerStarter; import org.apache.pinot.controller.api.access.AllowAllAccessFactory; +import org.apache.pinot.controller.api.resources.PauseStatusDetails; +import org.apache.pinot.controller.api.resources.TableViews; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -76,6 +79,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.NetUtils; import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -91,6 +95,9 @@ import static org.testng.Assert.*; public class ControllerTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerTest.class); + public static final String LOCAL_HOST = "localhost"; public static final String DEFAULT_DATA_DIR = new File(FileUtils.getTempDirectoryPath(), "test-controller-data-dir" + System.currentTimeMillis()).getAbsolutePath(); @@ -824,6 +831,87 @@ public class ControllerTest { } } + public void runRealtimeSegmentValidationTask(String tableName) + throws IOException { + runPeriodicTask("RealtimeSegmentValidationManager", tableName, TableType.REALTIME); + } + + public void runPeriodicTask(String taskName, String tableName, TableType tableType) + throws IOException { + sendGetRequest(getControllerRequestURLBuilder().forPeriodTaskRun(taskName, tableName, tableType)); + } + + public void pauseTable(String tableName) + throws IOException { + sendPostRequest(getControllerRequestURLBuilder().forPauseConsumption(tableName)); + TestUtils.waitForCondition((aVoid) -> { + try { + PauseStatusDetails pauseStatusDetails = + JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)), + PauseStatusDetails.class); + if (pauseStatusDetails.getConsumingSegments().isEmpty()) { + return true; + } + LOGGER.warn("Table not yet paused. Response " + pauseStatusDetails); + return false; + } catch (IOException e) { + throw new RuntimeException(e); + } + }, 2000, 60_000L, "Failed to pause table: " + tableName); + } + + public void resumeTable(String tableName) + throws IOException { + resumeTable(tableName, "lastConsumed"); + } + + public void resumeTable(String tableName, String offsetCriteria) + throws IOException { + sendPostRequest(getControllerRequestURLBuilder().forResumeConsumption(tableName) + + "?consumeFrom=" + offsetCriteria); + TestUtils.waitForCondition((aVoid) -> { + try { + PauseStatusDetails pauseStatusDetails = + JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)), + PauseStatusDetails.class); + // Its possible no segment is in consuming state, so check pause flag + if (!pauseStatusDetails.getPauseFlag()) { + return true; + } + LOGGER.warn("Pause flag is not yet set to false. Response " + pauseStatusDetails); + return false; + } catch (IOException e) { + throw new RuntimeException(e); + } + }, 2000, 60_000L, "Failed to resume table: " + tableName); + } + + public void waitForNumSegmentsInDesiredStateInEV(String tableName, String desiredState, + int desiredNumConsumingSegments, TableType type) { + TestUtils.waitForCondition((aVoid) -> { + try { + AtomicInteger numConsumingSegments = new AtomicInteger(0); + TableViews.TableView tableView = getExternalView(tableName, type); + Map<String, Map<String, String>> viewForType = + type.equals(TableType.OFFLINE) ? tableView._offline : tableView._realtime; + viewForType.values().forEach((v) -> { + numConsumingSegments.addAndGet((int) v.values().stream().filter((v1) -> v1.equals(desiredState)).count()); + }); + return numConsumingSegments.get() == desiredNumConsumingSegments; + } catch (IOException e) { + return false; + } + }, 5000, 60_000L, + "Failed to wait for " + desiredNumConsumingSegments + " consuming segments for table: " + tableName + ); + } + + public TableViews.TableView getExternalView(String tableName, TableType type) + throws IOException { + String state = sendGetRequest(getControllerRequestURLBuilder().forExternalView(tableName + "_" + type)); + return JsonUtils.stringToObject(state, TableViews.TableView.class); + } + public static String sendGetRequest(String urlString) throws IOException { return sendGetRequest(urlString, null); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index a86cf62e2e..25c286d3a2 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -1832,6 +1832,13 @@ public class PinotLLCRealtimeSegmentManagerTest { } } + @Override + List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs, + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList, + boolean forceGetOffsetFromStream) { + return getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); + } + @Override protected boolean isExceededMaxSegmentCompletionTime(String realtimeTableName, String segmentName, long currentTimeMs) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index d49b8484a6..ffda64c0c7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -479,7 +479,10 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { messageBatch.getMessageCount(), messageBatch.getUnfilteredMessageCount(), messageBatch.isEndOfPartitionGroup()); } - _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup(); + // We need to check for both endOfPartitionGroup and messageCount == 0, because + // endOfPartitionGroup can be true even if this is the last batch of messages (has been observed for kinesis) + // To process the last batch of messages, we need to set _endOfPartitionGroup to false in such a case + _endOfPartitionGroup = messageBatch.getMessageCount() == 0 && messageBatch.isEndOfPartitionGroup(); _consecutiveErrorCount = 0; } catch (PermanentConsumerException e) { _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L); diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index f8e11e7c5f..3156df8f65 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -366,7 +366,12 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { */ protected TableConfig createRealtimeTableConfig(File sampleAvroFile) { AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile; - return new TableConfigBuilder(TableType.REALTIME) + return getTableConfigBuilder(TableType.REALTIME).build(); + } + + // TODO - Use this method to create table config for all table types to avoid redundant code + protected TableConfigBuilder getTableConfigBuilder(TableType tableType) { + return new TableConfigBuilder(tableType) .setTableName(getTableName()) .setTimeColumnName(getTimeColumnName()) .setSortedColumn(getSortedColumn()) @@ -384,8 +389,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { .setIngestionConfig(getIngestionConfig()) .setQueryConfig(getQueryConfig()) .setStreamConfigs(getStreamConfigs()) - .setNullHandlingEnabled(getNullHandlingEnabled()) - .build(); + .setNullHandlingEnabled(getNullHandlingEnabled()); } /** diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/BaseKinesisIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/BaseKinesisIntegrationTest.java new file mode 100644 index 0000000000..7842230084 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/BaseKinesisIntegrationTest.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.realtime.ingestion; + +import cloud.localstack.Localstack; +import cloud.localstack.ServiceName; +import cloud.localstack.docker.annotation.LocalstackDockerAnnotationProcessor; +import cloud.localstack.docker.annotation.LocalstackDockerConfiguration; +import cloud.localstack.docker.annotation.LocalstackDockerProperties; +import cloud.localstack.docker.command.Command; +import java.io.File; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.integration.tests.BaseClusterIntegrationTest; +import org.apache.pinot.integration.tests.realtime.ingestion.utils.KinesisUtils; +import org.apache.pinot.plugin.stream.kinesis.KinesisConfig; +import org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.apache.pinot.util.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.SkipException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.apache.ApacheSdkHttpService; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.utils.AttributeMap; + + +/** + * Creates all dependencies (docker image, kinesis server, kinesis client, configs) for all tests requiring kinesis + */ +@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag = BaseKinesisIntegrationTest.LOCALSTACK_IMAGE) +abstract class BaseKinesisIntegrationTest extends BaseClusterIntegrationTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(BaseKinesisIntegrationTest.class); + + static final String LOCALSTACK_IMAGE = "2.3.2"; + private static final LocalstackDockerAnnotationProcessor PROCESSOR = new LocalstackDockerAnnotationProcessor(); + private final Localstack _localstackDocker = Localstack.INSTANCE; + protected KinesisClient _kinesisClient; + + private static final String REGION = "us-east-1"; + private static final String LOCALSTACK_KINESIS_ENDPOINT = "http://localhost:4566"; + protected static final String STREAM_TYPE = "kinesis"; + protected static final String STREAM_NAME = "kinesis-test"; + + @BeforeClass + public void setUp() + throws Exception { + try { + DockerInfoCommand dockerInfoCommand = new DockerInfoCommand(); + dockerInfoCommand.execute(); + } catch (IllegalStateException e) { + LOGGER.warn("Skipping kinesis tests! Docker is not found running", e); + throw new SkipException(e.getMessage()); + } + + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServer(); + + startKinesis(); + } + + @AfterClass + public void tearDown() + throws Exception { + stopServer(); + stopBroker(); + stopController(); + stopZk(); + stopKinesis(); + FileUtils.deleteDirectory(_tempDir); + } + + protected void createStream(int numShards) { + LOGGER.warn("Stream " + STREAM_NAME + " being created"); + _kinesisClient.createStream(CreateStreamRequest.builder().streamName(STREAM_NAME).shardCount(numShards).build()); + + TestUtils.waitForCondition(aVoid -> + KinesisUtils.isKinesisStreamActive(_kinesisClient, STREAM_NAME), 2000L, 60000, + "Kinesis stream " + STREAM_NAME + " is not created or is not in active state", true); + } + + protected void deleteStream() { + try { + _kinesisClient.deleteStream(DeleteStreamRequest.builder().streamName(STREAM_NAME).build()); + } catch (ResourceNotFoundException ignored) { + return; + } + TestUtils.waitForCondition(aVoid -> { + try { + KinesisUtils.getKinesisStreamStatus(_kinesisClient, STREAM_NAME); + } catch (ResourceNotFoundException e) { + return true; + } + return false; + }, 2000L, 60000, + "Kinesis stream " + STREAM_NAME + " is not deleted", true); + + LOGGER.warn("Stream " + STREAM_NAME + " deleted"); + } + + protected PutRecordResponse putRecord(String data, String partitionKey) { + PutRecordRequest putRecordRequest = + PutRecordRequest.builder().streamName(STREAM_NAME).data(SdkBytes.fromUtf8String(data)) + .partitionKey(partitionKey).build(); + return _kinesisClient.putRecord(putRecordRequest); + } + + @Override + public Map<String, String> getStreamConfigs() { + Map<String, String> streamConfigMap = new HashMap<>(); + String streamType = STREAM_TYPE; + streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType); + + streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, + StreamConfigProperties.STREAM_TOPIC_NAME), STREAM_NAME); + streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, + StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS), "30000"); + streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, + StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), KinesisConsumerFactory.class.getName()); + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_DECODER_CLASS), + "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder"); + streamConfigMap.put(KinesisConfig.REGION, REGION); + streamConfigMap.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString()); + streamConfigMap.put(KinesisConfig.ENDPOINT, LOCALSTACK_KINESIS_ENDPOINT); + streamConfigMap.put(KinesisConfig.ACCESS_KEY, getLocalAWSCredentials().resolveCredentials().accessKeyId()); + streamConfigMap.put(KinesisConfig.SECRET_KEY, getLocalAWSCredentials().resolveCredentials().secretAccessKey()); + streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, Integer.toString(2000)); + streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType, + StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest"); + return streamConfigMap; + } + + @Override + public TableConfig createRealtimeTableConfig(File sampleAvroFile) { + // Calls the super class to create the table config. + // Properties like stream configs are overriden in the getStreamConfigs() method. + return super.createRealtimeTableConfig(sampleAvroFile); + } + + private void stopKinesis() { + if (_kinesisClient != null) { + _kinesisClient.close(); + } + if (_localstackDocker.isRunning()) { + _localstackDocker.stop(); + } + } + + private void startKinesis() + throws Exception { + LocalstackDockerConfiguration dockerConfig = PROCESSOR.process(this.getClass()); + StopAllLocalstackDockerCommand stopAllLocalstackDockerCommand = new StopAllLocalstackDockerCommand(); + stopAllLocalstackDockerCommand.execute(); + _localstackDocker.startup(dockerConfig); + + _kinesisClient = KinesisClient.builder().httpClient(new ApacheSdkHttpService().createHttpClientBuilder() + .buildWithDefaults( + AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.TRUE).build())) + .credentialsProvider(getLocalAWSCredentials()).region(Region.of(REGION)) + .endpointOverride(new URI(LOCALSTACK_KINESIS_ENDPOINT)).build(); + } + + private static class StopAllLocalstackDockerCommand extends Command { + + public void execute() { + String runningDockerContainers = + dockerExe.execute( + Arrays.asList("ps", "-a", "-q", "-f", "ancestor=localstack/localstack:" + LOCALSTACK_IMAGE)); + if (StringUtils.isNotBlank(runningDockerContainers) && !runningDockerContainers.toLowerCase().contains("error")) { + String[] containerList = runningDockerContainers.split("\n"); + + for (String containerId : containerList) { + dockerExe.execute(Arrays.asList("stop", containerId)); + } + } + } + } + + private static class DockerInfoCommand extends Command { + + public void execute() { + String dockerInfo = dockerExe.execute(Collections.singletonList("info")); + + if (dockerInfo.toLowerCase().contains("error")) { + throw new IllegalStateException("Docker daemon is not running!"); + } + } + } + + private static AwsCredentialsProvider getLocalAWSCredentials() { + return StaticCredentialsProvider.create(AwsBasicCredentials.create("access", "secret")); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KafkaIncreaseDecreasePartitionsIntegrationTest.java similarity index 54% rename from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java rename to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KafkaIncreaseDecreasePartitionsIntegrationTest.java index 4ea04f9895..3b40a6c417 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KafkaIncreaseDecreasePartitionsIntegrationTest.java @@ -16,21 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.integration.tests; +package org.apache.pinot.integration.tests.realtime.ingestion; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.pinot.controller.api.resources.PauseStatusDetails; -import org.apache.pinot.controller.api.resources.TableViews; +import org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.tools.utils.KafkaStarterUtils; -import org.apache.pinot.util.TestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.Test; +import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.*; + public class KafkaIncreaseDecreasePartitionsIntegrationTest extends BaseRealtimeClusterIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaIncreaseDecreasePartitionsIntegrationTest.class); @@ -38,41 +38,6 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest extends BaseRealtime private static final String KAFKA_TOPIC = "meetup"; private static final int NUM_PARTITIONS = 1; - String getExternalView(String tableName) - throws IOException { - return sendGetRequest(getControllerRequestURLBuilder().forExternalView(tableName)); - } - - void pauseTable(String tableName) - throws IOException { - sendPostRequest(getControllerRequestURLBuilder().forPauseConsumption(tableName)); - TestUtils.waitForCondition((aVoid) -> { - try { - PauseStatusDetails pauseStatusDetails = - JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)), - PauseStatusDetails.class); - return pauseStatusDetails.getConsumingSegments().isEmpty(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }, 60_000L, "Failed to pause table: " + tableName); - } - - void resumeTable(String tableName) - throws IOException { - sendPostRequest(getControllerRequestURLBuilder().forResumeConsumption(tableName)); - TestUtils.waitForCondition((aVoid) -> { - try { - PauseStatusDetails pauseStatusDetails = - JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)), - PauseStatusDetails.class); - return !pauseStatusDetails.getConsumingSegments().isEmpty(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }, 60_000L, "Failed to resume table: " + tableName); - } - String createTable() throws IOException { Schema schema = createSchema("simpleMeetup_schema.json"); @@ -83,24 +48,6 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest extends BaseRealtime return tableConfig.getTableName(); } - void waitForNumConsumingSegmentsInEV(String tableName, int desiredNumConsumingSegments) { - TestUtils.waitForCondition((aVoid) -> { - try { - AtomicInteger numConsumingSegments = new AtomicInteger(0); - String state = getExternalView(tableName); - TableViews.TableView tableView = JsonUtils.stringToObject(state, TableViews.TableView.class); - tableView._realtime.values().forEach((v) -> { - numConsumingSegments.addAndGet((int) v.values().stream().filter((v1) -> v1.equals("CONSUMING")).count()); - }); - return numConsumingSegments.get() == desiredNumConsumingSegments; - } catch (IOException e) { - LOGGER.error("Exception in waitForNumConsumingSegments: {}", e.getMessage()); - return false; - } - }, 5000, 300_000L, - "Failed to wait for " + desiredNumConsumingSegments + " consuming segments for table: " + tableName); - } - @Test public void testDecreasePartitions() throws Exception { @@ -108,7 +55,7 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest extends BaseRealtime LOGGER.info("Creating Kafka topic with {} partitions", NUM_PARTITIONS + 2); _kafkaStarters.get(0).createTopic(KAFKA_TOPIC, KafkaStarterUtils.getTopicCreationProps(NUM_PARTITIONS + 2)); String tableName = createTable(); - waitForNumConsumingSegmentsInEV(tableName, NUM_PARTITIONS + 2); + waitForNumSegmentsInDesiredStateInEV(tableName, CONSUMING, NUM_PARTITIONS + 2, TableType.REALTIME); pauseTable(tableName); @@ -118,7 +65,7 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest extends BaseRealtime _kafkaStarters.get(0).createTopic(KAFKA_TOPIC, KafkaStarterUtils.getTopicCreationProps(NUM_PARTITIONS)); resumeTable(tableName); - waitForNumConsumingSegmentsInEV(tableName, NUM_PARTITIONS); + waitForNumSegmentsInDesiredStateInEV(tableName, CONSUMING, NUM_PARTITIONS, TableType.REALTIME); } @Test(enabled = false) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KinesisShardChangeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KinesisShardChangeTest.java new file mode 100644 index 0000000000..22220e26eb --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KinesisShardChangeTest.java @@ -0,0 +1,425 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.realtime.ingestion; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.pinot.controller.api.resources.TableViews; +import org.apache.pinot.integration.tests.realtime.ingestion.utils.KinesisUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.util.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; + +import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING; +import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE; + + +public class KinesisShardChangeTest extends BaseKinesisIntegrationTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisShardChangeTest.class); + + private static final String SCHEMA_FILE_PATH = "kinesis/airlineStats_data_reduced.schema"; + private static final String DATA_FILE_PATH = "kinesis/airlineStats_data_reduced.json"; + private static final Integer NUM_SHARDS = 2; + + @BeforeMethod + public void beforeTest() + throws IOException { + createStream(NUM_SHARDS); + addSchema(createSchema(SCHEMA_FILE_PATH)); + TableConfig tableConfig = createRealtimeTableConfig(null); + addTableConfig(tableConfig); + } + + @AfterMethod + public void afterTest() + throws IOException { + dropRealtimeTable(getTableName()); + deleteSchema(getTableName()); + deleteStream(); + } + + /** + * Data provider for shard split and merge tests with different offset combinations. + * Documentation is in the test method. + */ + @DataProvider(name = "shardOffsetCombinations") + public Object[][] shardOffsetCombinations() { + return new Object[][]{ + {"split", "smallest", "lastConsumed", 100, 250, 4, 4}, + {"split", "smallest", null, 100, 250, 4, 4}, + {"split", "largest", "lastConsumed", 50, 200, 2, 4}, + {"split", "largest", null, 50, 200, 2, 4}, + {"split", "lastConsumed", "lastConsumed", 200, 200, 6, 4}, + {"split", "lastConsumed", "largest", 200, 200, 6, 4}, + {"split", "lastConsumed", null, 200, 200, 2, 4}, + {"split", null, null, 200, 200, 2, 4}, + {"merge", "smallest", "lastConsumed", 100, 250, 4, 1}, + {"merge", "smallest", null, 100, 250, 4, 1}, + {"merge", "largest", "lastConsumed", 50, 200, 2, 1}, + {"merge", "largest", null, 50, 200, 2, 1}, + {"merge", "lastConsumed", "lastConsumed", 200, 200, 3, 1}, + {"merge", "lastConsumed", "largest", 200, 200, 3, 1}, + {"merge", "lastConsumed", null, 200, 200, 2, 1}, + {"merge", null, null, 200, 200, 2, 1}, + }; + } + + /** + * Test case to validate shard split/merge behavior with different offset combinations. + * The expectation is that + * 1. when "smallest" offset is used, the old parent shards would be consumed first. + * New shards will not be consumed until RVM is run or resume() is called with lastConsumed / largest offset + * 2. when "largest" offset is used, only new records would be consumed and all prior records pushed to kinesis + * would be skipped. + * 3. when "lastConsumed" offset is used, data would be consumed based on the last consumed offset. + * 4. when RealtimeSegmentValidationManager is triggered, the behaviour should be same as calling resume() with + * "lastConsumed" offset. + * @param operation - "split" or "merge" + * @param firstOffsetCriteria - Offset criteria for the first resume call. + * If it's null, RealtimeSegmentValidationManager is triggered + * @param secondOffsetCriteria - Offset criteria for the second resume call. + * If it's null, RealtimeSegmentValidationManager is triggered + * @param firstExpectedRecords - Expected records after the first resume call + * @param secondExpectedRecords - Expected records after the second resume call + * @param expectedOnlineSegments - Expected number of online segments in the end + * @param expectedConsumingSegments - Expected Number of consuming segments in the end + */ + @Test(dataProvider = "shardOffsetCombinations") + public void testShardOperationsWithOffsets(String operation, String firstOffsetCriteria, String secondOffsetCriteria, + int firstExpectedRecords, int secondExpectedRecords, int expectedOnlineSegments, + int expectedConsumingSegments) + throws Exception { + + // Publish initial records and wait for them to be consumed + publishRecordsToKinesis(0, 50); + waitForRecordsToBeConsumed(getTableName(), 50); // pinot has created 2 segments + + // Perform shard operation (split or merge) + if ("split".equals(operation)) { + KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 0); // splits shard 0 into shard 2 & 3 + KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 1); // splits shard 1 into shard 4 & 5 + } else if ("merge".equals(operation)) { + KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 0, 1); // merges shard 0 & 1 into shard 2 + } + + // Publish more records after shard operation. These will go to the new shards + publishRecordsToKinesis(50, 200); + + if (firstOffsetCriteria != null) { + // Pause and resume with the first offset criteria + pauseTable(getTableName()); // This will commit the current segments + resumeTable(getTableName(), firstOffsetCriteria); + } else { + runRealtimeSegmentValidationTask(getTableName()); + } + + waitForRecordsToBeConsumed(getTableName(), firstExpectedRecords); // Pinot will create new segments + + if (secondOffsetCriteria != null) { + // Pause and resume with the second offset criteria + pauseTable(getTableName()); // This will commit the current segments + resumeTable(getTableName(), secondOffsetCriteria); + } else { + runRealtimeSegmentValidationTask(getTableName()); + } + + waitForRecordsToBeConsumed(getTableName(), secondExpectedRecords); // Pinot will create new segments + + // Publish more records after shard operation. These will go to the new shards + publishRecordsToKinesis(100, 200); + if (secondOffsetCriteria != null && secondOffsetCriteria.equals("largest")) { + // TODO - Fix this. Remove the check for largest offset. If largest offset is used, + // we should have consumed the 100 records published after table was resumed. + // Currently this is not happening. Thus the assertion is without the new records + // We currently rely on RVM to fix the consumption + waitForRecordsToBeConsumed(getTableName(), secondExpectedRecords); + } else { + waitForRecordsToBeConsumed(getTableName(), secondExpectedRecords + 100); + } + + runRealtimeSegmentValidationTask(getTableName()); + waitForRecordsToBeConsumed(getTableName(), secondExpectedRecords + 100); + + // Validate the final state of segments + validateSegmentStates(getTableName(), expectedOnlineSegments, expectedConsumingSegments); + } + + /** + * Data provider for new table tests with different offset combinations. + * Documentation is in the test method. + */ + @DataProvider(name = "initialOffsetCombinations") + public Object[][] initialOffsetCombinations() { + return new Object[][]{ + {"smallest", 50, 200}, + {"largest", 50, 200}, // TODO - Validate if table created with largest offset should not consume old records + {"lastConsumed", 50, 200} + }; + } + + /** + * Test case to split shards, then create new table and check consumption + * For the sake of brevity, we will only test shard split and calling Realtime Validation Manager + * Individually, pause and resume have been verified for shard split / merge operations + */ + @Test(dataProvider = "initialOffsetCombinations") + public void testNewTableAfterShardSplit(String offsetCriteria, int firstExpectedRecords, int secondExpectedRecords) + throws Exception { + // Publish initial records + publishRecordsToKinesis(0, 50); + + // Split the shards + KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 0); // splits shard 0 into shard 2 & 3 + KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 1); // splits shard 1 into shard 4 & 5 + + // new table is created with defined offset criteria but listening to the original stream + String name = getTableName() + "_" + offsetCriteria; + createNewSchemaAndTable(name, offsetCriteria); + + waitForRecordsToBeConsumed(name, firstExpectedRecords); + + // publish more records. These will go to the new shards + publishRecordsToKinesis(50, 200); + waitForRecordsToBeConsumed(name, firstExpectedRecords); // pinot doesn't listen to new shards yet. + + // Trigger RVM. This will commit the current segments and start consuming from the new shards + runRealtimeSegmentValidationTask(name); + waitForRecordsToBeConsumed(name, secondExpectedRecords); + + // Validate the final state of segments + validateSegmentStates(name, 2, 4); + + dropNewSchemaAndTable(name); + } + + /** + * Test case to first split shards, then merge some shards. + * For the sake of brevity, we will only test by calling Realtime Validation Manager + * Individually, pause and resume have been verified for shard split / merge operations + */ + @Test + public void testSplitAndMergeShards() + throws Exception { + // Publish initial records + publishRecordsToKinesis(0, 50); + waitForRecordsToBeConsumed(getTableName(), 50); // pinot has created 2 segments + + // Split the shards + KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 0); // splits shard 0 into shard 2 & 3 + KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 1); // splits shard 1 into shard 4 & 5 + + // Publish more records after shard operation. These will go to the new shards + publishRecordsToKinesis(50, 175); + + // Merge some shards + KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 2, 3); // merges shard 2 & 3 into shard 6 + KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 4, 5); // merges shard 4 & 5 into shard 7 + + // Publish more records after shard operation. These will go to the new shards + publishRecordsToKinesis(175, 200); + + // Trigger RVM. This will commit segments 0 and 1 and start consuming from shards 2-5 + runRealtimeSegmentValidationTask(getTableName()); + waitForRecordsToBeConsumed(getTableName(), 175); + + // Trigger RVM. This will commit segments 2-5 and start consuming from shards 6-7 + runRealtimeSegmentValidationTask(getTableName()); + waitForRecordsToBeConsumed(getTableName(), 200); + + // Validate that 8 segments are created in total + validateSegmentStates(getTableName(), 6, 2); + } + + /** + * Test case to continuously publish records to kinesis (in a background thread) and concurrently split shards + * and concurrently call pause and resume APIs or RVM and finally validate the total count of records + */ + @Test + public void testConcurrentShardSplit() + throws IOException, InterruptedException { + // Start a background thread to continuously publish records to kinesis + Thread publisherThread = new Thread(() -> { + try { + for (int i = 0; i < 200; i += 5) { + publishRecordsToKinesis(i, i + 5); + Thread.sleep(1000); + } + } catch (Exception e) { + LOGGER.error("Error while publishing records to kinesis", e); + } + }); + publisherThread.start(); // This will take ~40 secs to complete with 5 records ingested per second + + Thread.sleep(5000); + + // Split the shards + KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 0); // splits shard 0 into shard 2 & 3 + KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 1); // splits shard 1 into shard 4 & 5 + + Thread.sleep(5000); + + // Trigger RVM. This will commit segments 0 and 1 and start consuming from shards 2-5 + runRealtimeSegmentValidationTask(getTableName()); // This will commit segments 0-1 and start consuming from 2-5 + + // Merge some shards + KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 2, 3); // merges shard 2 & 3 into shard 6 + KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 4, 5); // merges shard 4 & 5 into shard 7 + + Thread.sleep(5000); + + // Call pause and resume APIs + pauseTable(getTableName()); // This will commit segments 2-5 + resumeTable(getTableName(), "lastConsumed"); // start consuming from shards 6-7 + + // Wait for the publisher thread to finish + try { + publisherThread.join(); + } catch (InterruptedException e) { + LOGGER.error("Error while waiting for publisher thread to finish", e); + } + + waitForRecordsToBeConsumed(getTableName(), 200); + + // Validate that all records are consumed + validateSegmentStates(getTableName(), 6, 2); + } + + private void validateSegmentStates(String tableName, int expectedOnlineSegments, int expectedConsumingSegments) + throws IOException { + TableViews.TableView tableView = getExternalView(tableName, TableType.REALTIME); + Assert.assertEquals(tableView._realtime.size(), expectedOnlineSegments + expectedConsumingSegments); + + List<String> onlineSegments = tableView._realtime.entrySet().stream() + .filter(x -> x.getValue().containsValue(ONLINE)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + Assert.assertEquals(onlineSegments.size(), expectedOnlineSegments); + + List<String> consumingSegments = tableView._realtime.entrySet().stream() + .filter(x -> x.getValue().containsValue(CONSUMING)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + Assert.assertEquals(consumingSegments.size(), expectedConsumingSegments); + } + + /** + * start and end offsets are essentially the start row index and end row index of the file + * + * @param startOffset - inclusive + * @param endOffset - exclusive + */ + private void publishRecordsToKinesis(int startOffset, int endOffset) + throws Exception { + InputStream inputStream = RealtimeKinesisIntegrationTest.class.getClassLoader() + .getResourceAsStream(KinesisShardChangeTest.DATA_FILE_PATH); + assert inputStream != null; + try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + String line; + int count = 0; + while ((line = br.readLine()) != null) { + // Skip the first startOffset lines + if (count < startOffset) { + count++; + continue; + } + if (count++ >= endOffset) { + break; + } + JsonNode data = JsonUtils.stringToJsonNode(line); + PutRecordResponse putRecordResponse = putRecord(line, data.get("Origin").textValue()); + if (putRecordResponse.sdkHttpResponse().statusCode() != 200) { + throw new RuntimeException("Failed to put record " + line + " to Kinesis stream with status code: " + + putRecordResponse.sdkHttpResponse().statusCode()); + } + } + } + } + + private void waitForRecordsToBeConsumed(String tableName, int expectedNumRecords) + throws InterruptedException { + TestUtils.waitForCondition(aVoid -> { + try { + long count = getPinotConnection().execute("SELECT COUNT(*) FROM " + tableName).getResultSet(0).getLong(0); + if (count != expectedNumRecords) { + LOGGER.warn("Expected {} records, but got {} records. Retrying", expectedNumRecords, count); + } + return count == expectedNumRecords; + } catch (Exception e) { + return false; + } + }, 2000, 60_000L, "Wait for all records to be ingested"); + // Sleep for few secs and validate the count again (to ensure no more records are ingested) + Thread.sleep(2000); + long count = getPinotConnection().execute("SELECT COUNT(*) FROM " + tableName).getResultSet(0).getLong(0); + Assert.assertEquals(count, expectedNumRecords); + } + + private void createNewSchemaAndTable(String name, String offsetCriteria) + throws IOException { + Schema schema = createSchema(SCHEMA_FILE_PATH); + schema.setSchemaName(name); + addSchema(schema); + + TableConfigBuilder tableConfigBuilder = getTableConfigBuilder(TableType.REALTIME); + tableConfigBuilder.setTableName(name); + Map<String, String> streamConfigs = getStreamConfigs(); + streamConfigs.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, + StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), offsetCriteria); + tableConfigBuilder.setStreamConfigs(streamConfigs); + TableConfig tableConfig = tableConfigBuilder.build(); + addTableConfig(tableConfig); + } + + private void dropNewSchemaAndTable(String name) + throws IOException { + dropRealtimeTable(name); + deleteSchema(name); + } + + @Override + public List<String> getNoDictionaryColumns() { + return Collections.emptyList(); + } + + @Override + public String getSortedColumn() { + return null; + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/RealtimeKinesisIntegrationTest.java similarity index 51% rename from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java rename to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/RealtimeKinesisIntegrationTest.java index 1e33a0b7fe..1f5569a603 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/RealtimeKinesisIntegrationTest.java @@ -16,29 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.integration.tests; - -import cloud.localstack.Localstack; -import cloud.localstack.ServiceName; -import cloud.localstack.docker.annotation.LocalstackDockerAnnotationProcessor; -import cloud.localstack.docker.annotation.LocalstackDockerConfiguration; -import cloud.localstack.docker.annotation.LocalstackDockerProperties; -import cloud.localstack.docker.command.Command; +package org.apache.pinot.integration.tests.realtime.ingestion; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeType; import com.google.common.base.Function; import java.io.BufferedReader; -import java.io.File; import java.io.InputStream; import java.io.InputStreamReader; -import java.net.URI; -import java.net.URL; import java.nio.charset.StandardCharsets; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -46,95 +36,45 @@ import java.util.List; import java.util.Map; import javax.activation.UnsupportedDataTypeException; import javax.annotation.Nullable; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.client.ResultSet; -import org.apache.pinot.plugin.stream.kinesis.KinesisConfig; -import org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.StringUtil; -import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.util.TestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; -import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.core.SdkBytes; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; -import software.amazon.awssdk.http.apache.ApacheSdkHttpService; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisClient; -import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; -import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; -import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; -import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; -import software.amazon.awssdk.utils.AttributeMap; -@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag = "2.3.2") -public class RealtimeKinesisIntegrationTest extends BaseClusterIntegrationTestSet { +public class RealtimeKinesisIntegrationTest extends BaseKinesisIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeKinesisIntegrationTest.class); - private static final LocalstackDockerAnnotationProcessor PROCESSOR = new LocalstackDockerAnnotationProcessor(); - private static final String STREAM_NAME = "kinesis-test"; - private static final String STREAM_TYPE = "kinesis"; - - public static final String REGION = "us-east-1"; - public static final String LOCALSTACK_KINESIS_ENDPOINT = "http://localhost:4566"; - public static final int NUM_SHARDS = 10; + private static final int NUM_SHARDS = 10; // Localstack Kinesis doesn't support large rows. // So, this airlineStats data file consists of only few fields and rows from the original data - public static final String SCHEMA_FILE_PATH = "kinesis/airlineStats_data_reduced.schema"; - public static final String DATA_FILE_PATH = "kinesis/airlineStats_data_reduced.json"; - - private final Localstack _localstackDocker = Localstack.INSTANCE; - - private static KinesisClient _kinesisClient = null; + private static final String SCHEMA_FILE_PATH = "kinesis/airlineStats_data_reduced.schema"; + private static final String DATA_FILE_PATH = "kinesis/airlineStats_data_reduced.json"; private long _totalRecordsPushedInStream = 0; List<String> _h2FieldNameAndTypes = new ArrayList<>(); - private boolean _skipTestNoDockerInstalled = false; - @BeforeClass public void setUp() throws Exception { - try { - DockerInfoCommand dockerInfoCommand = new DockerInfoCommand(); - dockerInfoCommand.execute(); - } catch (IllegalStateException e) { - _skipTestNoDockerInstalled = true; - LOGGER.warn("Skipping test! Docker is not found running", e); - throw new SkipException(e.getMessage()); - } - - TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); - - // Start the Pinot cluster - startZk(); - startController(); - startBroker(); - startServer(); + super.setUp(); - // Start Kinesis - startKinesis(); + // Create new stream + createStream(NUM_SHARDS); // Create and upload the schema and table config - addSchema(createKinesisSchema()); - addTableConfig(createKinesisTableConfig()); + addSchema(createSchema(SCHEMA_FILE_PATH)); + addTableConfig(createRealtimeTableConfig(null)); createH2ConnectionAndTable(); @@ -145,13 +85,6 @@ public class RealtimeKinesisIntegrationTest extends BaseClusterIntegrationTestSe waitForAllDocsLoadedKinesis(120_000L); } - public Schema createKinesisSchema() - throws Exception { - URL resourceUrl = BaseClusterIntegrationTest.class.getClassLoader().getResource(SCHEMA_FILE_PATH); - Assert.assertNotNull(resourceUrl); - return Schema.fromFile(new File(resourceUrl.getFile())); - } - protected void waitForAllDocsLoadedKinesis(long timeoutMs) throws Exception { waitForAllDocsLoadedKinesis(timeoutMs, true); @@ -172,79 +105,14 @@ public class RealtimeKinesisIntegrationTest extends BaseClusterIntegrationTestSe }, 1000L, timeoutMs, "Failed to load " + _totalRecordsPushedInStream + " documents", raiseError); } - public TableConfig createKinesisTableConfig() { - return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()) - .setTimeColumnName("DaysSinceEpoch").setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()) - .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()) - .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) - .setStreamConfigs(createKinesisStreamConfig()).setNullHandlingEnabled(getNullHandlingEnabled()).build(); - } - - public Map<String, String> createKinesisStreamConfig() { - Map<String, String> streamConfigMap = new HashMap<>(); - String streamType = "kinesis"; - streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType); - - streamConfigMap.put( - StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_TOPIC_NAME), - STREAM_NAME); - - streamConfigMap.put( - StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS), - "30000"); - streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, - StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), KinesisConsumerFactory.class.getName()); - streamConfigMap.put( - StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_DECODER_CLASS), - "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder"); - streamConfigMap.put(KinesisConfig.REGION, REGION); - streamConfigMap.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString()); - streamConfigMap.put(KinesisConfig.ENDPOINT, LOCALSTACK_KINESIS_ENDPOINT); - streamConfigMap.put(KinesisConfig.ACCESS_KEY, getLocalAWSCredentials().resolveCredentials().accessKeyId()); - streamConfigMap.put(KinesisConfig.SECRET_KEY, getLocalAWSCredentials().resolveCredentials().secretAccessKey()); - streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, Integer.toString(200)); - streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType, - StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest"); - return streamConfigMap; - } - - public void startKinesis() - throws Exception { - final LocalstackDockerConfiguration dockerConfig = PROCESSOR.process(this.getClass()); - StopAllLocalstackDockerCommand stopAllLocalstackDockerCommand = new StopAllLocalstackDockerCommand(); - stopAllLocalstackDockerCommand.execute(); - _localstackDocker.startup(dockerConfig); - - _kinesisClient = KinesisClient.builder().httpClient(new ApacheSdkHttpService().createHttpClientBuilder() - .buildWithDefaults( - AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.TRUE).build())) - .credentialsProvider(getLocalAWSCredentials()).region(Region.of(REGION)) - .endpointOverride(new URI(LOCALSTACK_KINESIS_ENDPOINT)).build(); - - _kinesisClient.createStream(CreateStreamRequest.builder().streamName(STREAM_NAME).shardCount(NUM_SHARDS).build()); - - TestUtils.waitForCondition(new Function<Void, Boolean>() { - @Nullable - @Override - public Boolean apply(@Nullable Void aVoid) { - try { - String kinesisStreamStatus = - _kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(STREAM_NAME).build()) - .streamDescription().streamStatusAsString(); - - return kinesisStreamStatus.contentEquals("ACTIVE"); - } catch (Exception e) { - LOGGER.warn("Could not fetch kinesis stream status", e); - return null; - } - } - }, 1000L, 30000, "Kinesis stream " + STREAM_NAME + " is not created or is not in active state", true); + @Override + public List<String> getNoDictionaryColumns() { + return Collections.emptyList(); } - public void stopKinesis() { - if (_localstackDocker.isRunning()) { - _localstackDocker.stop(); - } + @Override + public String getSortedColumn() { + return null; } private void publishRecordsToKinesis() { @@ -264,10 +132,7 @@ public class RealtimeKinesisIntegrationTest extends BaseClusterIntegrationTestSe while ((line = br.readLine()) != null) { JsonNode data = JsonUtils.stringToJsonNode(line); - PutRecordRequest putRecordRequest = - PutRecordRequest.builder().streamName(STREAM_NAME).data(SdkBytes.fromUtf8String(line)) - .partitionKey(data.get("Origin").textValue()).build(); - PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest); + PutRecordResponse putRecordResponse = putRecord(line, data.get("Origin").textValue()); if (putRecordResponse.sdkHttpResponse().statusCode() == 200) { if (StringUtils.isNotBlank(putRecordResponse.sequenceNumber()) && StringUtils.isNotBlank( putRecordResponse.shardId())) { @@ -303,10 +168,6 @@ public class RealtimeKinesisIntegrationTest extends BaseClusterIntegrationTestSe } } - private static AwsCredentialsProvider getLocalAWSCredentials() { - return StaticCredentialsProvider.create(AwsBasicCredentials.create("access", "secret")); - } - @Test public void testRecords() throws Exception { @@ -435,42 +296,7 @@ public class RealtimeKinesisIntegrationTest extends BaseClusterIntegrationTestSe @AfterClass public void tearDown() throws Exception { - if (_skipTestNoDockerInstalled) { - return; - } - dropRealtimeTable(getTableName()); - stopServer(); - stopBroker(); - stopController(); - stopZk(); - stopKinesis(); - FileUtils.deleteDirectory(_tempDir); - } - - public static class StopAllLocalstackDockerCommand extends Command { - - public void execute() { - String runningDockerContainers = - dockerExe.execute(Arrays.asList("ps", "-a", "-q", "-f", "ancestor=localstack/localstack")); - if (StringUtils.isNotBlank(runningDockerContainers) && !runningDockerContainers.toLowerCase().contains("error")) { - String[] containerList = runningDockerContainers.split("\n"); - - for (String containerId : containerList) { - dockerExe.execute(Arrays.asList("stop", containerId)); - } - } - } - } - - public static class DockerInfoCommand extends Command { - - public void execute() { - String dockerInfo = dockerExe.execute(Collections.singletonList("info")); - - if (dockerInfo.toLowerCase().contains("error")) { - throw new IllegalStateException("Docker daemon is not running!"); - } - } + super.tearDown(); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/utils/KinesisUtils.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/utils/KinesisUtils.java new file mode 100644 index 0000000000..8f72d6f81e --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/utils/KinesisUtils.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.realtime.ingestion.utils; + +import java.math.BigInteger; +import java.time.Duration; +import java.util.List; +import org.apache.pinot.util.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; +import software.amazon.awssdk.services.kinesis.model.MergeShardsRequest; +import software.amazon.awssdk.services.kinesis.model.MergeShardsResponse; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.SplitShardRequest; +import software.amazon.awssdk.services.kinesis.model.SplitShardResponse; + + +public class KinesisUtils { + + private KinesisUtils() { + } + + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisUtils.class); + + public static void splitNthShard(KinesisClient kinesisClient, String stream, int index) { + List<Shard> shards = getShards(kinesisClient, stream); + int initialSize = shards.size(); + splitShard(kinesisClient, stream, shards.get(index)); + LOGGER.info("Splitted shard with ID: " + shards.get(index).shardId()); + + TestUtils.waitForCondition((avoid) -> isKinesisStreamActive(kinesisClient, stream) + && getShards(kinesisClient, stream).size() == initialSize + 2, + 2000, Duration.ofMinutes(1).toMillis(), "Waiting for Kinesis stream to be active and shards to be split"); + } + + public static void mergeShards(KinesisClient kinesisClient, String stream, int index1, int index2) { + List<Shard> shards = getShards(kinesisClient, stream); + int initialSize = shards.size(); + mergeShard(kinesisClient, stream, shards.get(index1), shards.get(index2)); + LOGGER.info("Merged shard with ID: " + shards.get(index1).shardId() + " and " + shards.get(index2).shardId()); + + TestUtils.waitForCondition((avoid) -> isKinesisStreamActive(kinesisClient, stream) + && getShards(kinesisClient, stream).size() == initialSize + 1, + 2000, Duration.ofMinutes(1).toMillis(), "Waiting for Kinesis stream to be active and shards to be merged"); + } + + public static boolean isKinesisStreamActive(KinesisClient kinesisClient, String streamName) { + try { + String kinesisStreamStatus = getKinesisStreamStatus(kinesisClient, streamName); + boolean isActive = kinesisStreamStatus.contentEquals("ACTIVE"); + if (!isActive) { + LOGGER.warn("Kinesis stream " + streamName + " in state" + kinesisStreamStatus); + } + return isActive; + } catch (ResourceNotFoundException e) { + LOGGER.warn("Kinesis stream " + streamName + " not found"); + return false; + } + } + + public static String getKinesisStreamStatus(KinesisClient kinesisClient, String streamName) { + return kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(streamName).build()) + .streamDescription().streamStatusAsString(); + } + + private static List<Shard> getShards(KinesisClient kinesisClient, String stream) { + ListShardsResponse listShardsResponse = + kinesisClient.listShards(ListShardsRequest.builder().streamName(stream).build()); + return listShardsResponse.shards(); + } + + private static SplitShardResponse splitShard(KinesisClient kinesisClient, String stream, Shard shard) { + BigInteger startHash = new BigInteger(shard.hashKeyRange().startingHashKey()); + BigInteger endHash = new BigInteger(shard.hashKeyRange().endingHashKey()); + BigInteger newStartingHashKey = startHash.add(endHash).divide(new BigInteger("2")); + return kinesisClient.splitShard(SplitShardRequest.builder() + .shardToSplit(shard.shardId()) + .streamName(stream) + .newStartingHashKey(newStartingHashKey.toString()) + .build()); + } + + private static MergeShardsResponse mergeShard(KinesisClient kinesisClient, String stream, Shard shard1, + Shard shard2) { + return kinesisClient.mergeShards(MergeShardsRequest.builder() + .shardToMerge(shard1.shardId()) + .adjacentShardToMerge(shard2.shardId()) + .streamName(stream) + .build()); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index de876b3071..f5a905e111 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -61,6 +61,16 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti super(config, kinesisClient); } + /** + * Based on Kinesis documentation, we might get a response with empty records but a non-null nextShardIterator. + * Known cases are: + * 1. When the shard has ended (has been split or merged) and we need a couple of calls to getRecords() to reach + * a null iterator + * 2. When there are no new messages in the shard but the shard is active. We will continue to get a non-null + * nextShardIterator in this case + * 3. When there are some messages in the shard, but we need a few iterations to get them. + * This needs to be handled by the client based on appropriate retry strategy. + */ @Override public synchronized KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) { try { @@ -98,6 +108,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti GetRecordsRequest getRecordRequest = GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build(); GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordRequest); + List<Record> records = getRecordsResponse.records(); List<BytesStreamMessage> messages; KinesisPartitionGroupOffset offsetOfNextBatch; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java index 612ea38098..d9b5f17e39 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java @@ -197,6 +197,23 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { return newPartitionGroupMetadataList; } + /** + * Refer documentation for {@link #computePartitionGroupMetadata(String, StreamConfig, List, int)} + * @param forceGetOffsetFromStream - the flag is not required for Kinesis stream. Kinesis implementation + * takes care of returning non-null offsets for all old and new partitions. + * The flag is primarily required for Kafka stream which requires refactoring + * to avoid this flag. More details in {@link + * StreamMetadataProvider#computePartitionGroupMetadata( + * String, StreamConfig, List, int, boolean)} + */ + @Override + public List<PartitionGroupMetadata> computePartitionGroupMetadata(String clientId, StreamConfig streamConfig, + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses, int timeoutMillis, + boolean forceGetOffsetFromStream) + throws IOException, TimeoutException { + return computePartitionGroupMetadata(clientId, streamConfig, partitionGroupConsumptionStatuses, timeoutMillis); + } + /** * Converts a shardId string to a partitionGroupId integer by parsing the digits of the shardId * e.g. "shardId-000000000001" becomes 1 @@ -213,8 +230,29 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { throws IOException, TimeoutException { try (PartitionGroupConsumer partitionGroupConsumer = _kinesisStreamConsumerFactory.createPartitionGroupConsumer( _clientId, partitionGroupConsumptionStatus)) { - MessageBatch<?> messageBatch = partitionGroupConsumer.fetchMessages(startCheckpoint, _fetchTimeoutMs); - return messageBatch.getMessageCount() == 0 && messageBatch.isEndOfPartitionGroup(); + int attempts = 0; + while (true) { + MessageBatch<?> messageBatch = partitionGroupConsumer.fetchMessages(startCheckpoint, _fetchTimeoutMs); + if (messageBatch.getMessageCount() > 0) { + // There are messages left to be consumed so we haven't consumed the shard fully + return false; + } + if (messageBatch.isEndOfPartitionGroup()) { + // Shard can't be iterated further. We have consumed all the messages because message count = 0 + return true; + } + // Even though message count = 0, shard can be iterated further. + // Based on kinesis documentation, there might be more records to be consumed. + // So we need to fetch messages again to check if we have reached end of shard. + // To prevent an infinite loop (known cases listed in fetchMessages()), we will limit the number of attempts + attempts++; + if (attempts >= 5) { + LOGGER.warn("Reached max attempts to check if end of shard reached from checkpoint {}. " + + " Assuming we have not consumed till end of shard.", startCheckpoint); + return false; + } + // continue to fetch messages. reusing the partitionGroupConsumer ensures we use new shard iterator + } } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java index c6cf493370..7ad46919f2 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java @@ -179,6 +179,38 @@ public class KinesisStreamMetadataProviderTest { Assert.assertEquals(result.size(), 1); Assert.assertEquals(result.get(0).getPartitionGroupId(), 1); Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(), 1); + + // Simulate the case where initial calls to fetchMessages returns empty messages but non-null next shard iterator + when(_partitionGroupConsumer.fetchMessages(checkpointArgs.capture(), intArguments.capture())) + .thenReturn(new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, false)) + .thenReturn(new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, false)) + .thenReturn(new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, true)); + result = + _kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID, getStreamConfig(), + currentPartitionGroupMeta, TIMEOUT); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.get(0).getPartitionGroupId(), 1); + Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(), 1); + + // Simulate the case where all calls to fetchMessages returns empty messages and non-null next shard iterator + when(_partitionGroupConsumer.fetchMessages(checkpointArgs.capture(), intArguments.capture())) + .thenReturn(new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, false)) + .thenReturn(new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, false)) + .thenReturn(new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, false)) + .thenReturn(new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, false)) + .thenReturn(new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, false)) + .thenReturn(new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, false)) + .thenReturn(new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, false)); + + result = + _kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID, getStreamConfig(), + currentPartitionGroupMeta, TIMEOUT); + + Assert.assertEquals(result.size(), 2); + Assert.assertEquals(result.get(0).getPartitionGroupId(), 0); + Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(), 1); + Assert.assertEquals(result.get(1).getPartitionGroupId(), 1); + Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(), 1); } @Test diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java index 77eac3832e..01f429a511 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java @@ -47,16 +47,6 @@ public class FreshnessBasedConsumptionStatusChecker extends IngestionBasedConsum _idleTimeoutMs = idleTimeoutMs; } - private boolean isOffsetCaughtUp(StreamPartitionMsgOffset currentOffset, StreamPartitionMsgOffset latestOffset) { - if (currentOffset != null && latestOffset != null) { - // Kafka's "latest" offset is actually the next available offset. Therefore it will be 1 ahead of the - // current offset in the case we are caught up. - // TODO: implement a way to have this work correctly for kafka consumers - return currentOffset.compareTo(latestOffset) >= 0; - } - return false; - } - private boolean segmentHasBeenIdleLongerThanThreshold(long segmentIdleTime) { return _idleTimeoutMs > 0 && segmentIdleTime > _idleTimeoutMs; } @@ -84,7 +74,7 @@ public class FreshnessBasedConsumptionStatusChecker extends IngestionBasedConsum // the stream consumer to check partition count if we're already caught up. StreamPartitionMsgOffset currentOffset = rtSegmentDataManager.getCurrentOffset(); StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.fetchLatestStreamOffset(5000); - if (isOffsetCaughtUp(currentOffset, latestStreamOffset)) { + if (isOffsetCaughtUp(segmentName, currentOffset, latestStreamOffset)) { _logger.info("Segment {} with freshness {}ms has not caught up within min freshness {}. " + "But the current ingested offset is equal to the latest available offset {}.", segmentName, freshnessMs, _minFreshnessMs, currentOffset); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java index c6fe0d16d6..18d08dd3d5 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java @@ -29,6 +29,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,4 +136,24 @@ public abstract class IngestionBasedConsumptionStatusChecker { } protected abstract boolean isSegmentCaughtUp(String segmentName, RealtimeSegmentDataManager rtSegmentDataManager); + + protected boolean isOffsetCaughtUp(String segmentName, + StreamPartitionMsgOffset currentOffset, StreamPartitionMsgOffset latestOffset) { + if (currentOffset != null && latestOffset != null) { + // Kafka's "latest" offset is actually the next available offset. Therefore it will be 1 ahead of the + // current offset in the case we are caught up. + // TODO: implement a way to have this work correctly for kafka consumers + _logger.info("Null offset found for segment {} - current offset: {}, latest offset: {}. " + + "Will check consumption status later", segmentName, currentOffset, latestOffset); + try { + return currentOffset.compareTo(latestOffset) >= 0; + } catch (NullPointerException e) { + // This can happen if the offsets are not comparable, + // Eg: Sequence number missing for a kinesis shard + _logger.info("Unable to compare offsets for segment {} - current offset: {}, latest offset: {}. " + + "Will check consumption status later", segmentName, currentOffset, latestOffset); + } + } + return false; + } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java index ad7d2905ba..b4f2ba12e2 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java @@ -45,18 +45,15 @@ public class OffsetBasedConsumptionStatusChecker extends IngestionBasedConsumpti protected boolean isSegmentCaughtUp(String segmentName, RealtimeSegmentDataManager rtSegmentDataManager) { StreamPartitionMsgOffset latestIngestedOffset = rtSegmentDataManager.getCurrentOffset(); StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.getLatestStreamOffsetAtStartupTime(); - if (latestStreamOffset == null || latestIngestedOffset == null) { - _logger.info("Null offset found for segment {} - latest stream offset: {}, latest ingested offset: {}. " - + "Will check consumption status later", segmentName, latestStreamOffset, latestIngestedOffset); - return false; - } - if (latestIngestedOffset.compareTo(latestStreamOffset) < 0) { - _logger.info("Latest ingested offset {} in segment {} is smaller than stream latest available offset {} ", - latestIngestedOffset, segmentName, latestStreamOffset); - return false; + + if (isOffsetCaughtUp(segmentName, latestIngestedOffset, latestStreamOffset)) { + _logger.info("Segment {} with latest ingested offset {} has caught up to the latest stream offset {}", + segmentName, latestIngestedOffset, latestStreamOffset); + return true; } - _logger.info("Segment {} with latest ingested offset {} has caught up to the latest stream offset {}", segmentName, - latestIngestedOffset, latestStreamOffset); - return true; + + _logger.info("Latest ingested offset {} in segment {} is smaller than stream latest available offset {} ", + latestIngestedOffset, segmentName, latestStreamOffset); + return false; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 30cbe8bd63..53f0e33ed1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -40,13 +40,16 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> { private final List<PartitionGroupConsumptionStatus> _partitionGroupConsumptionStatusList; private Exception _exception; private final List<String> _topicNames; + private final boolean _forceGetOffsetFromStream; public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs, - List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList, + boolean forceGetOffsetFromStream) { _topicNames = streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList()); _streamConfigs = streamConfigs; _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList; _newPartitionGroupMetadataList = new ArrayList<>(); + _forceGetOffsetFromStream = forceGetOffsetFromStream; } public List<PartitionGroupMetadata> getPartitionGroupMetadataList() { @@ -83,8 +86,8 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> { _newPartitionGroupMetadataList.addAll( streamMetadataProvider.computePartitionGroupMetadata(StreamConsumerFactory.getUniqueClientId(clientId), _streamConfigs.get(i), - topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000).stream().map( - metadata -> new PartitionGroupMetadata( + topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000, _forceGetOffsetFromStream).stream() + .map(metadata -> new PartitionGroupMetadata( IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId( metadata.getPartitionGroupId(), index), metadata.getStartOffset())).collect(Collectors.toList()) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 64770d3f83..66bf9768b5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -21,6 +21,7 @@ package org.apache.pinot.spi.stream; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -98,6 +99,26 @@ public interface StreamMetadataProvider extends Closeable { return newPartitionGroupMetadataList; } + /** + * @param forceGetOffsetFromStream - the flag is a workaround to not use partitionGroupConsumptionStatuses. + * This is required because PinotLLCRealtimeSegmentManager.selectStartOffset() + * actually requires the offsets from the stream, but was originally relying on + * passing an empty partitionGroupConsumptionStatuses to the method. + * The change for <a href="https://github.com/apache/pinot/issues/15608">...</a> + * required to pass the actual partitionGroupConsumptionStatuses + * TODO - Remove the flag and fix the clients calling computePartitionGroupMetadata() + */ + default List<PartitionGroupMetadata> computePartitionGroupMetadata(String clientId, StreamConfig streamConfig, + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses, int timeoutMillis, + boolean forceGetOffsetFromStream) + throws IOException, TimeoutException { + if (forceGetOffsetFromStream) { + return computePartitionGroupMetadata(clientId, streamConfig, Collections.emptyList(), timeoutMillis); + } else { + return computePartitionGroupMetadata(clientId, streamConfig, partitionGroupConsumptionStatuses, timeoutMillis); + } + } + default Map<String, PartitionLagState> getCurrentPartitionLagState( Map<String, ConsumerPartitionState> currentPartitionStateMap) { Map<String, PartitionLagState> result = new HashMap<>(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index 48a3e63f75..2804eac53e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -113,6 +113,11 @@ public class ControllerRequestURLBuilder { return StringUtil.join("/", _baseUrl, "periodictask", "run?taskname=" + taskName); } + public String forPeriodTaskRun(String taskName, String tableName, TableType tableType) { + return StringUtil.join("/", _baseUrl, "periodictask", "run?taskname=" + taskName + "&tableName=" + tableName + + "&type=" + tableType); + } + public String forUpdateUserConfig(String username, String componentTypeStr, boolean passwordChanged) { StringBuilder params = new StringBuilder(); if (StringUtils.isNotBlank(username)) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org