This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5e532ec Fix backward incompatibility in StreamFactoryConsumerProvider (#5557) 5e532ec is described below commit 5e532eccd8f0898b0b13eef32b9f5335682a037e Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com> AuthorDate: Fri Jun 12 17:32:32 2020 -0700 Fix backward incompatibility in StreamFactoryConsumerProvider (#5557) * Fix backward incompatibility in StreamFactoryConsumerProvider PR #5542 introduced a backward incompatibility in this class that would cause existing stream implementations to break when they use pinot 0.5.0. This commit re-introduces the deleted method * Eliminated a new factory provider for offset parser Changed the StreamConsumerFactory to also provide the offset factory. This is much easier way to keep backward compat and reduces complexity. --- .../core/realtime/PinotLLCRealtimeSegmentManager.java | 6 ++++-- .../helix/core/realtime/SegmentCompletionManager.java | 2 +- .../realtime/HLRealtimeSegmentDataManager.java | 2 +- .../realtime/LLRealtimeSegmentDataManager.java | 5 +++-- .../impl/fakestream/FakeStreamConsumerFactory.java | 2 +- .../pinot/core/realtime/stream/StreamConfigTest.java | 2 -- .../kafka20/KafkaPartitionLevelConsumerTest.java | 2 +- .../pinot/spi/stream/PartitionCountFetcher.java | 2 +- .../pinot/spi/stream/PartitionOffsetFetcher.java | 2 +- .../org/apache/pinot/spi/stream/StreamConfig.java | 12 ------------ .../pinot/spi/stream/StreamConsumerFactory.java | 4 ++++ .../spi/stream/StreamConsumerFactoryProvider.java | 19 +------------------ 12 files changed, 18 insertions(+), 42 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 32b6aea..9ea0613 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -508,7 +508,8 @@ public class PinotLLCRealtimeSegmentManager { int numPartitions, int numReplicas) { String realtimeTableName = tableConfig.getTableName(); String segmentName = newLLCSegmentName.getSegmentName(); - StreamPartitionMsgOffsetFactory offsetFactory = StreamConsumerFactoryProvider.createOffsetFactory(streamConfig); + StreamPartitionMsgOffsetFactory offsetFactory = + StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); StreamPartitionMsgOffset startOffset = offsetFactory.create(committingSegmentDescriptor.getNextOffset()); LOGGER .info("Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}", @@ -830,7 +831,8 @@ public class PinotLLCRealtimeSegmentManager { Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); long currentTimeMs = getCurrentTimeMs(); - StreamPartitionMsgOffsetFactory offsetFactory = StreamConsumerFactoryProvider.createOffsetFactory(streamConfig); + StreamPartitionMsgOffsetFactory offsetFactory = + StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); // Get the latest segment ZK metadata for each partition Map<Integer, LLCRealtimeSegmentZKMetadata> latestSegmentZKMetadataMap = diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index 1424b17..3353a6b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -124,7 +124,7 @@ public class SegmentCompletionManager { TableConfig tableConfig = _segmentManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName)); PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), tableConfig.getIndexingConfig().getStreamConfigs()); - return StreamConsumerFactoryProvider.createOffsetFactory(streamConfig); + return StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); } // We need to make sure that we never create multiple FSMs for the same segment diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java index a231d2e..1e3ba88 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java @@ -167,7 +167,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _resourceTmpDir.mkdirs(); } // create and init stream level consumer - StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.createConsumerFactory(_streamConfig); + StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" + _streamConfig.getTopicName(); _streamLevelConsumer = streamConsumerFactory .createStreamLevelConsumer(clientId, _tableNameWithType, SchemaUtils.extractSourceFields(schema), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 6dcef0a..4c8e6ea 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1106,8 +1106,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // TODO Validate configs IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); _partitionLevelStreamConfig = new PartitionLevelStreamConfig(_tableNameWithType, indexingConfig.getStreamConfigs()); - _streamConsumerFactory = StreamConsumerFactoryProvider.createConsumerFactory(_partitionLevelStreamConfig); - _streamPartitionMsgOffsetFactory = StreamConsumerFactoryProvider.createOffsetFactory(_partitionLevelStreamConfig); + _streamConsumerFactory = StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig); + _streamPartitionMsgOffsetFactory = + StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig).createStreamMsgOffsetFactory(); _streamTopic = _partitionLevelStreamConfig.getTopicName(); _segmentNameStr = _segmentZKMetadata.getSegmentName(); _llcSegmentName = llcSegmentName; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java index 6598f19..926cdec 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java @@ -74,7 +74,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory { StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions); // stream consumer factory - StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.createConsumerFactory(streamConfig); + StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); // stream metadata provider StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java index e4a8178..af09f30 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java @@ -125,8 +125,6 @@ public class StreamConfigTest { Assert.assertFalse(exception); Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), StreamConfig.DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING); - Assert.assertEquals(streamConfig.getPartitionOffsetFactoryClassName(), - StreamConfig.DEFAULT_PARTITION_OFFSET_FACTORY_CLASS_NAME_STRING); // Missing decoder class streamConfigMap.put(StreamConfigProperties diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java index afab33d..9b20a77 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java @@ -269,7 +269,7 @@ public class KafkaPartitionLevelConsumerTest { streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass"); StreamConfig streamConfig = new StreamConfig(tableNameWithType, streamConfigMap); - final StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.createConsumerFactory(streamConfig); + final StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); int numPartitions = new KafkaStreamMetadataProvider(clientId, streamConfig).fetchPartitionCount(10000); for (int partition = 0; partition < numPartitions; partition++) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java index b656ed2..d523235 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java @@ -38,7 +38,7 @@ public class PartitionCountFetcher implements Callable<Boolean> { public PartitionCountFetcher(StreamConfig streamConfig) { _streamConfig = streamConfig; - _streamConsumerFactory = StreamConsumerFactoryProvider.createConsumerFactory(_streamConfig); + _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); _topicName = streamConfig.getTopicName(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java index ed19084..1d50160 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java @@ -44,7 +44,7 @@ public class PartitionOffsetFetcher implements Callable<Boolean> { _offsetCriteria = offsetCriteria; _partitionId = partitionId; _streamConfig = streamConfig; - _streamConsumerFactory = StreamConsumerFactoryProvider.createConsumerFactory(streamConfig); + _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); _topicName = streamConfig.getTopicName(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java index d3e7599..817bcd5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java @@ -51,7 +51,6 @@ public class StreamConfig { public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING = "org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory"; - public static final String DEFAULT_PARTITION_OFFSET_FACTORY_CLASS_NAME_STRING = LongMsgOffsetFactory.class.getName(); public static final long DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS = 30_000; public static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS = 5_000; @@ -66,7 +65,6 @@ public class StreamConfig { private final OffsetCriteria _offsetCriteria; private final String _decoderClass; private final Map<String, String> _decoderProperties = new HashMap<>(); - private final String _partitionOffsetFactoryClassName; private final long _connectionTimeoutMillis; private final int _fetchTimeoutMillis; @@ -121,12 +119,6 @@ public class StreamConfig { _offsetCriteria = new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest(); } - String partitionOffsetClassKey = StreamConfigProperties.constructStreamProperty(_type, - StreamConfigProperties.PARTITION_MSG_OFFSET_FACTORY_CLASS); - // For backward compatibility, the offset factory class is for handling kafka offsets (long type) - _partitionOffsetFactoryClassName = streamConfigMap.getOrDefault(partitionOffsetClassKey, - DEFAULT_PARTITION_OFFSET_FACTORY_CLASS_NAME_STRING); - String decoderClassKey = StreamConfigProperties.constructStreamProperty(_type, StreamConfigProperties.STREAM_DECODER_CLASS); _decoderClass = streamConfigMap.get(decoderClassKey); @@ -269,10 +261,6 @@ public class StreamConfig { return _consumerFactoryClassName; } - public String getPartitionOffsetFactoryClassName() { - return _partitionOffsetFactoryClassName; - } - public OffsetCriteria getOffsetCriteria() { return _offsetCriteria; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java index c4f7c9d..27205c9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java @@ -69,4 +69,8 @@ public abstract class StreamConsumerFactory { * @return */ public abstract StreamMetadataProvider createStreamMetadataProvider(String clientId); + + public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() { + return new LongMsgOffsetFactory(); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactoryProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactoryProvider.java index 6cc3364..fbdda1b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactoryProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactoryProvider.java @@ -25,13 +25,12 @@ import org.apache.pinot.spi.plugin.PluginManager; * Provider class for {@link StreamConsumerFactory} */ public abstract class StreamConsumerFactoryProvider { - /** * Constructs the {@link StreamConsumerFactory} using the {@link StreamConfig::getConsumerFactoryClassName()} property and initializes it * @param streamConfig * @return */ - public static StreamConsumerFactory createConsumerFactory(StreamConfig streamConfig) { + public static StreamConsumerFactory create(StreamConfig streamConfig) { StreamConsumerFactory factory = null; try { factory = PluginManager.get().createInstance(streamConfig.getConsumerFactoryClassName()); @@ -41,20 +40,4 @@ public abstract class StreamConsumerFactoryProvider { factory.init(streamConfig); return factory; } - - /** - * Cronstructs the {@link StreamPartitionMsgOffsetFactory} using {@link StreamConfig::getPartitionOffsetFactoryClassName} - * and initializes it - * @param streamConfig - */ - public static StreamPartitionMsgOffsetFactory createOffsetFactory(StreamConfig streamConfig) { - StreamPartitionMsgOffsetFactory factory = null; - try { - factory = PluginManager.get().createInstance(streamConfig.getPartitionOffsetFactoryClassName()); - } catch (Exception e) { - ExceptionUtils.rethrow(e); - } - factory.init(streamConfig); - return factory; - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org