This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 570a95a2e3a7189615433bbab23c962f6957805c Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Tue Jan 5 10:20:43 2021 -0800 Remove unused classes and changes --- .../apache/pinot/common/utils/CommonConstants.java | 4 - .../helix/core/PinotHelixResourceManager.java | 61 ++++++-------- .../helix/core/PinotTableIdealStateBuilder.java | 10 ++- .../realtime/PinotLLCRealtimeSegmentManager.java | 96 ++++++++++++---------- .../impl/fakestream/FakeStreamConsumerFactory.java | 5 +- .../fakestream/FakeStreamMetadataProvider.java | 8 +- ...lakyConsumerRealtimeClusterIntegrationTest.java | 2 - .../kafka09/KafkaPartitionLevelConsumerTest.java | 4 +- .../kafka20/KafkaPartitionLevelConsumer.java | 1 - .../kafka20/KafkaStreamMetadataProvider.java | 1 - .../plugin/stream/kinesis/KinesisConsumer.java | 2 +- .../org/apache/pinot/spi/stream/FetchResult.java | 24 ------ .../org/apache/pinot/spi/stream/MessageBatch.java | 2 - .../spi/stream/PartitionGroupMetadataList.java | 30 ------- .../org/apache/pinot/spi/stream/StreamConfig.java | 6 +- .../pinot/spi/stream/StreamMetadataProvider.java | 4 +- 16 files changed, 91 insertions(+), 169 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index 4e81349..191ae93 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -393,10 +393,6 @@ public class CommonConstants { public static final String FLUSH_THRESHOLD_TIME = "segment.flush.threshold.time"; public static final String PARTITION_METADATA = "segment.partition.metadata"; /** - * Serialized {@link org.apache.pinot.spi.stream.PartitionGroupMetadata} for this segment - */ - public static final String PARTITION_GROUP_METADATA = "segment.partition.group.metadata"; - /** * This field is used for parallel push protection to lock the segment globally. * We put the segment upload start timestamp so that if the previous push failed without unlock the segment, the * next upload won't be blocked forever. diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index b2949e7..b50da5f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1351,45 +1351,34 @@ public class PinotHelixResourceManager { IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig)); IdealState idealState = getTableIdealState(realtimeTableName); - - if (streamConfig.isShardedConsumerType()) { - idealState = PinotTableIdealStateBuilder - .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, - _enableBatchMessageMode); - _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); - LOGGER.info("Successfully setup table for SHARDED consumers for {} ", realtimeTableName); - } else { - - if (streamConfig.hasHighLevelConsumerType()) { - if (idealState == null) { - LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName); - idealState = PinotTableIdealStateBuilder - .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, _helixZkManager, - _propertyStore, _enableBatchMessageMode); - _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState); - } else { - // Remove LLC segments if it is not configured - if (!streamConfig.hasLowLevelConsumerType()) { - _pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState); - } + if (streamConfig.hasHighLevelConsumerType()) { + if (idealState == null) { + LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName); + idealState = PinotTableIdealStateBuilder + .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, _helixZkManager, + _propertyStore, _enableBatchMessageMode); + _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState); + } else { + // Remove LLC segments if it is not configured + if (!streamConfig.hasLowLevelConsumerType()) { + _pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState); } - // For HLC table, property store entry must exist to trigger watchers to create segments - ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName); } + // For HLC table, property store entry must exist to trigger watchers to create segments + ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName); + } - // Either we have only low-level consumer, or both. - if (streamConfig.hasLowLevelConsumerType()) { - // Will either create idealstate entry, or update the IS entry with new segments - // (unless there are low-level segments already present) - if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) { - idealState = PinotTableIdealStateBuilder - .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, - _enableBatchMessageMode); - _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); - LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName); - } else { - LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName); - } + // Either we have only low-level consumer, or both. + if (streamConfig.hasLowLevelConsumerType()) { + // Will either create idealstate entry, or update the IS entry with new segments + // (unless there are low-level segments already present) + if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) { + PinotTableIdealStateBuilder + .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, + _enableBatchMessageMode); + LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName); + } else { + LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName); } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 8b200bb..68bcf57 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -30,6 +30,7 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.stream.PartitionGroupInfo; import org.apache.pinot.spi.stream.PartitionGroupMetadata; @@ -94,8 +95,9 @@ public class PinotTableIdealStateBuilder { return idealState; } - public static IdealState buildLowLevelRealtimeIdealStateFor(String realtimeTableName, TableConfig realtimeTableConfig, - IdealState idealState, boolean enableBatchMessageMode) { + public static void buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, + String realtimeTableName, TableConfig realtimeTableConfig, IdealState idealState, + boolean enableBatchMessageMode) { // Validate replicasPerPartition here. final String replicasPerPartitionStr = realtimeTableConfig.getValidationConfig().getReplicasPerPartition(); @@ -104,7 +106,7 @@ public class PinotTableIdealStateBuilder { } final int nReplicas; try { - nReplicas = Integer.parseInt(replicasPerPartitionStr); + nReplicas = Integer.valueOf(replicasPerPartitionStr); } catch (NumberFormatException e) { throw new PinotHelixResourceManager.InvalidTableConfigException( "Invalid value for replicasPerPartition, expected a number: " + replicasPerPartitionStr, e); @@ -112,7 +114,7 @@ public class PinotTableIdealStateBuilder { if (idealState == null) { idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas, enableBatchMessageMode); } - return idealState; + pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); } public static List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index a6ef625..61ef719 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -24,12 +24,12 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -84,9 +84,7 @@ import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; import org.apache.pinot.spi.stream.PartitionOffsetFetcher; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; -import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; -import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -204,42 +202,6 @@ public class PinotLLCRealtimeSegmentManager { return partitionGroupMetadataList; } - /** - * Sets up the realtime table ideal state for a table of consumer type SHARDED - */ - public void setUpNewTable(TableConfig tableConfig, IdealState idealState) { - Preconditions.checkState(!_isStopping, "Segment manager is stopping"); - - String realtimeTableName = tableConfig.getTableName(); - LOGGER.info("Setting up new SHARDED table: {}", realtimeTableName); - - _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName); - - PartitionLevelStreamConfig streamConfig = - new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); - - // get new partition groups and their metadata - List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, Collections.emptyList()); - int numPartitionGroups = newPartitionGroupInfoList.size(); - - InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); - int numReplicas = getNumReplicas(tableConfig, instancePartitions); - - SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig); - Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = - Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions); - - long currentTimeMs = getCurrentTimeMs(); - Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); - for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) { - String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, - currentTimeMs, instancePartitions, numPartitionGroups, numReplicas); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, - instancePartitionsMap); - } - setIdealState(realtimeTableName, idealState); - } - public boolean getIsSplitCommitEnabled() { return _controllerConf.getAcceptSplitCommit(); } @@ -274,6 +236,50 @@ public class PinotLLCRealtimeSegmentManager { } /** + * Sets up the initial segments for a new LLC real-time table. + * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC and LLC are configured. + */ + public void setUpNewTable(TableConfig tableConfig, IdealState idealState) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); + + String realtimeTableName = tableConfig.getTableName(); + LOGGER.info("Setting up new LLC table: {}", realtimeTableName); + + // Make sure all the existing segments are HLC segments + List<String> currentSegments = getAllSegments(realtimeTableName); + for (String segmentName : currentSegments) { + // TODO: Should return 4xx HTTP status code. Currently all exceptions are returning 500 + Preconditions.checkState(SegmentName.isHighLevelConsumerSegmentName(segmentName), + "Cannot set up new LLC table: %s with existing non-HLC segment: %s", realtimeTableName, segmentName); + } + + _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName); + + PartitionLevelStreamConfig streamConfig = + new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); + // get new partition groups and their metadata + List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, Collections.emptyList()); + int numPartitionGroups = newPartitionGroupInfoList.size(); + + int numReplicas = getNumReplicas(tableConfig, instancePartitions); + + SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig); + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions); + + long currentTimeMs = getCurrentTimeMs(); + Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) { + String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, + currentTimeMs, instancePartitions, numPartitionGroups, numReplicas); + updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, + instancePartitionsMap); + } + setIdealState(realtimeTableName, idealState); + } + + /** * Removes all LLC segments from the given IdealState. */ public void removeLLCSegments(IdealState idealState) { @@ -498,7 +504,7 @@ public class PinotLLCRealtimeSegmentManager { IngestionConfigUtils.getStreamConfigMap(tableConfig)); // find new partition groups [A],[B],[C],[D] - List<PartitionGroupInfo> newPartitionGroupMetadataList = + List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList); // create new segment metadata, only if it is not IN_PROGRESS in the current state @@ -508,7 +514,7 @@ public class PinotLLCRealtimeSegmentManager { List<String> newConsumingSegmentNames = new ArrayList<>(); String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); long newSegmentCreationTimeMs = getCurrentTimeMs(); - for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupMetadataList) { + for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) { int newPartitionGroupId = partitionGroupInfo.getPartitionGroupId(); PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId); if (currentPartitionGroupMetadata == null) { // not present in current state. New partition found. @@ -1162,14 +1168,16 @@ public class PinotLLCRealtimeSegmentManager { return System.currentTimeMillis(); } + // fixme: investigate if this should only return active partitions (i.e. skip a shard if it has reached eol) + // or return all unique partitions found in ideal state right from the birth of the table private int getNumPartitionsFromIdealState(IdealState idealState) { - int numPartitions = 0; + Set<String> uniquePartitions = new HashSet<>(); for (String segmentName : idealState.getRecord().getMapFields().keySet()) { if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) { - numPartitions = Math.max(numPartitions, new LLCSegmentName(segmentName).getPartitionGroupId() + 1); + uniquePartitions.add(String.valueOf(new LLCSegmentName(segmentName).getPartitionGroupId())); } } - return numPartitions; + return uniquePartitions.size(); } private int getNumReplicas(TableConfig tableConfig, InstancePartitions instancePartitions) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java index b0dc7eb..bb01e5c 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.realtime.impl.fakestream; -import java.util.Collections; import java.util.Set; import org.apache.pinot.core.util.IngestionUtils; import org.apache.pinot.spi.config.table.TableConfig; @@ -27,8 +26,6 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.OffsetCriteria; -import org.apache.pinot.spi.stream.PartitionGroupConsumer; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactory; @@ -82,7 +79,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory { // stream metadata provider StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId); - int partitionCount = streamMetadataProvider.getPartitionGroupInfoList("clientId", streamConfig, Collections.emptyList(), 10_000).size(); + int partitionCount = streamMetadataProvider.fetchPartitionCount(10_000); System.out.println(partitionCount); // Partition metadata provider diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java index 61aa01f..e0b8ebd 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java @@ -19,13 +19,9 @@ package org.apache.pinot.core.realtime.impl.fakestream; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import org.apache.pinot.spi.stream.OffsetCriteria; -import org.apache.pinot.spi.stream.PartitionGroupInfo; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; @@ -35,12 +31,10 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; * StreamMetadataProvider implementation for the fake stream */ public class FakeStreamMetadataProvider implements StreamMetadataProvider { - private final int _numPartitions; - private StreamConfig _streamConfig; + private int _numPartitions; public FakeStreamMetadataProvider(StreamConfig streamConfig) { _numPartitions = FakeStreamConfigUtils.getNumPartitions(streamConfig); - _streamConfig = streamConfig; } @Override diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java index 4503de0..b05244f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java @@ -22,8 +22,6 @@ import java.lang.reflect.Constructor; import java.util.Random; import java.util.Set; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.stream.PartitionGroupConsumer; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactory; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java index 90dc5ad..beb82e5 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java @@ -267,7 +267,7 @@ public class KafkaPartitionLevelConsumerTest { } @Test - public void testGetPartitionCount() throws Exception { + public void testGetPartitionCount() { String streamType = "kafka"; String streamKafkaTopicName = "theTopic"; String streamKafkaBrokerList = "abcd:1234,bcde:2345"; @@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest { KafkaStreamMetadataProvider streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory); - Assert.assertEquals(streamMetadataProvider.getPartitionGroupInfoList("clientId", streamConfig, Collections.emptyList(), 10000), 2); + Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2); } @Test diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index 25b1742..f9b4365 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.utils.Bytes; -import org.apache.pinot.spi.stream.Checkpoint; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.PartitionLevelConsumer; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index 38c49f5..c0e2041 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -42,7 +42,6 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa } @Override - @Deprecated public int fetchPartitionCount(long timeoutMillis) { return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size(); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index a97f3dc..70d2c8a 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -170,7 +170,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti GetShardIteratorRequest.Builder requestBuilder = GetShardIteratorRequest.builder().streamName(_stream).shardId(shardId).shardIteratorType(_shardIteratorType); - if (sequenceNumber != null) { + if (sequenceNumber != null && _shardIteratorType.toString().contains("SEQUENCE")) { requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber); } return _kinesisClient.getShardIterator(requestBuilder.build()).shardIterator(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java deleted file mode 100644 index 7e8a911..0000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.stream; - -public interface FetchResult<T> { - Checkpoint getLastCheckpoint(); - MessageBatch<T> getMessages(); -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java index 5af72c0..3052b9e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.spi.stream; -import javax.annotation.Nullable; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; @@ -62,7 +61,6 @@ public interface MessageBatch<T> { * Returns the metadata associated with the message at a particular index. This typically includes the timestamp * when the message was ingested by the upstream stream-provider and other relevant metadata. */ - @Nullable default RowMetadata getMetadataAtIndex(int index) { return null; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java deleted file mode 100644 index 1568d63..0000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.stream; - -import java.util.List; - - -public interface PartitionGroupMetadataList { - - List<PartitionGroupMetadata> getMetadataList(); - - PartitionGroupMetadata getPartitionGroupMetadata(int index); - -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java index a3e359e..d343203 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java @@ -41,7 +41,7 @@ public class StreamConfig { * The type of the stream consumer either HIGHLEVEL or LOWLEVEL. For backward compatibility, adding SIMPLE which is equivalent to LOWLEVEL */ public enum ConsumerType { - HIGHLEVEL, LOWLEVEL, SHARDED + HIGHLEVEL, LOWLEVEL } public static final int DEFAULT_FLUSH_THRESHOLD_ROWS = 5_000_000; @@ -273,10 +273,6 @@ public class StreamConfig { return _consumerTypes.contains(ConsumerType.LOWLEVEL); } - public boolean isShardedConsumerType() { - return _consumerTypes.size() == 1 && _consumerTypes.get(0).equals(ConsumerType.SHARDED); - } - public String getConsumerFactoryClassName() { return _consumerFactoryClassName; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 572cd02..c64f710 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -36,9 +36,9 @@ import org.apache.pinot.spi.annotations.InterfaceStability; public interface StreamMetadataProvider extends Closeable { /** * Fetches the number of partitions for a topic given the stream configs - * @deprecated use getPartitionGroupMetadataList instead + * @param timeoutMillis + * @return */ - @Deprecated int fetchPartitionCount(long timeoutMillis); // Issue 5953 Retain this interface for 0.5.0, remove in 0.6.0 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org