This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e532ec  Fix backward incompatibility in StreamFactoryConsumerProvider 
(#5557)
5e532ec is described below

commit 5e532eccd8f0898b0b13eef32b9f5335682a037e
Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com>
AuthorDate: Fri Jun 12 17:32:32 2020 -0700

    Fix backward incompatibility in StreamFactoryConsumerProvider (#5557)
    
    * Fix backward incompatibility in StreamFactoryConsumerProvider
    
    PR #5542 introduced a backward incompatibility in this class that
    would cause existing stream implementations to break when they use
    pinot 0.5.0. This commit re-introduces the deleted method
    
    * Eliminated a new factory provider for offset parser
    
    Changed the StreamConsumerFactory to also provide the offset factory.
    This is much easier way to keep backward compat and reduces complexity.
---
 .../core/realtime/PinotLLCRealtimeSegmentManager.java |  6 ++++--
 .../helix/core/realtime/SegmentCompletionManager.java |  2 +-
 .../realtime/HLRealtimeSegmentDataManager.java        |  2 +-
 .../realtime/LLRealtimeSegmentDataManager.java        |  5 +++--
 .../impl/fakestream/FakeStreamConsumerFactory.java    |  2 +-
 .../pinot/core/realtime/stream/StreamConfigTest.java  |  2 --
 .../kafka20/KafkaPartitionLevelConsumerTest.java      |  2 +-
 .../pinot/spi/stream/PartitionCountFetcher.java       |  2 +-
 .../pinot/spi/stream/PartitionOffsetFetcher.java      |  2 +-
 .../org/apache/pinot/spi/stream/StreamConfig.java     | 12 ------------
 .../pinot/spi/stream/StreamConsumerFactory.java       |  4 ++++
 .../spi/stream/StreamConsumerFactoryProvider.java     | 19 +------------------
 12 files changed, 18 insertions(+), 42 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 32b6aea..9ea0613 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -508,7 +508,8 @@ public class PinotLLCRealtimeSegmentManager {
       int numPartitions, int numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
     String segmentName = newLLCSegmentName.getSegmentName();
-    StreamPartitionMsgOffsetFactory offsetFactory = 
StreamConsumerFactoryProvider.createOffsetFactory(streamConfig);
+    StreamPartitionMsgOffsetFactory offsetFactory =
+        
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
     StreamPartitionMsgOffset startOffset = 
offsetFactory.create(committingSegmentDescriptor.getNextOffset());
     LOGGER
         .info("Creating segment ZK metadata for new CONSUMING segment: {} with 
start offset: {} and creation time: {}",
@@ -830,7 +831,8 @@ public class PinotLLCRealtimeSegmentManager {
 
     Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
     long currentTimeMs = getCurrentTimeMs();
-    StreamPartitionMsgOffsetFactory offsetFactory = 
StreamConsumerFactoryProvider.createOffsetFactory(streamConfig);
+    StreamPartitionMsgOffsetFactory offsetFactory =
+        
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
 
     // Get the latest segment ZK metadata for each partition
     Map<Integer, LLCRealtimeSegmentZKMetadata> latestSegmentZKMetadataMap =
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 1424b17..3353a6b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -124,7 +124,7 @@ public class SegmentCompletionManager {
     TableConfig tableConfig = 
_segmentManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
     PartitionLevelStreamConfig streamConfig =
         new PartitionLevelStreamConfig(tableConfig.getTableName(), 
tableConfig.getIndexingConfig().getStreamConfigs());
-    return StreamConsumerFactoryProvider.createOffsetFactory(streamConfig);
+    return 
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
   }
 
   // We need to make sure that we never create multiple FSMs for the same 
segment
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index a231d2e..1e3ba88 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -167,7 +167,7 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       _resourceTmpDir.mkdirs();
     }
     // create and init stream level consumer
-    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.createConsumerFactory(_streamConfig);
+    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_streamConfig);
     String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" 
+ _streamConfig.getTopicName();
     _streamLevelConsumer = streamConsumerFactory
         .createStreamLevelConsumer(clientId, _tableNameWithType, 
SchemaUtils.extractSourceFields(schema),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 6dcef0a..4c8e6ea 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1106,8 +1106,9 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     // TODO Validate configs
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
     _partitionLevelStreamConfig = new 
PartitionLevelStreamConfig(_tableNameWithType, 
indexingConfig.getStreamConfigs());
-    _streamConsumerFactory = 
StreamConsumerFactoryProvider.createConsumerFactory(_partitionLevelStreamConfig);
-    _streamPartitionMsgOffsetFactory = 
StreamConsumerFactoryProvider.createOffsetFactory(_partitionLevelStreamConfig);
+    _streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig);
+    _streamPartitionMsgOffsetFactory =
+        
StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig).createStreamMsgOffsetFactory();
     _streamTopic = _partitionLevelStreamConfig.getTopicName();
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _llcSegmentName = llcSegmentName;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 6598f19..926cdec 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -74,7 +74,7 @@ public class FakeStreamConsumerFactory extends 
StreamConsumerFactory {
     StreamConfig streamConfig = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions);
 
     // stream consumer factory
-    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.createConsumerFactory(streamConfig);
+    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
 
     // stream metadata provider
     StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(clientId);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
index e4a8178..af09f30 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
@@ -125,8 +125,6 @@ public class StreamConfigTest {
     Assert.assertFalse(exception);
     Assert.assertEquals(streamConfig.getConsumerFactoryClassName(),
         StreamConfig.DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING);
-    Assert.assertEquals(streamConfig.getPartitionOffsetFactoryClassName(),
-        StreamConfig.DEFAULT_PARTITION_OFFSET_FACTORY_CLASS_NAME_STRING);
 
     // Missing decoder class
     streamConfigMap.put(StreamConfigProperties
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
index afab33d..9b20a77 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
@@ -269,7 +269,7 @@ public class KafkaPartitionLevelConsumerTest {
     streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
     StreamConfig streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMap);
 
-    final StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.createConsumerFactory(streamConfig);
+    final StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
     int numPartitions =
         new KafkaStreamMetadataProvider(clientId, 
streamConfig).fetchPartitionCount(10000);
     for (int partition = 0; partition < numPartitions; partition++) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
index b656ed2..d523235 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
@@ -38,7 +38,7 @@ public class PartitionCountFetcher implements 
Callable<Boolean> {
 
   public PartitionCountFetcher(StreamConfig streamConfig) {
     _streamConfig = streamConfig;
-    _streamConsumerFactory = 
StreamConsumerFactoryProvider.createConsumerFactory(_streamConfig);
+    _streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_streamConfig);
     _topicName = streamConfig.getTopicName();
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
index ed19084..1d50160 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
@@ -44,7 +44,7 @@ public class PartitionOffsetFetcher implements 
Callable<Boolean> {
     _offsetCriteria = offsetCriteria;
     _partitionId = partitionId;
     _streamConfig = streamConfig;
-    _streamConsumerFactory = 
StreamConsumerFactoryProvider.createConsumerFactory(streamConfig);
+    _streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
     _topicName = streamConfig.getTopicName();
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index d3e7599..817bcd5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -51,7 +51,6 @@ public class StreamConfig {
 
   public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING =
       "org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory";
-  public static final String 
DEFAULT_PARTITION_OFFSET_FACTORY_CLASS_NAME_STRING = 
LongMsgOffsetFactory.class.getName();
 
   public static final long DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS = 30_000;
   public static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS = 5_000;
@@ -66,7 +65,6 @@ public class StreamConfig {
   private final OffsetCriteria _offsetCriteria;
   private final String _decoderClass;
   private final Map<String, String> _decoderProperties = new HashMap<>();
-  private final String _partitionOffsetFactoryClassName;
 
   private final long _connectionTimeoutMillis;
   private final int _fetchTimeoutMillis;
@@ -121,12 +119,6 @@ public class StreamConfig {
       _offsetCriteria = new 
OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest();
     }
 
-    String partitionOffsetClassKey = 
StreamConfigProperties.constructStreamProperty(_type,
-        StreamConfigProperties.PARTITION_MSG_OFFSET_FACTORY_CLASS);
-    // For backward compatibility, the offset factory class is for handling 
kafka offsets (long type)
-    _partitionOffsetFactoryClassName = 
streamConfigMap.getOrDefault(partitionOffsetClassKey,
-        DEFAULT_PARTITION_OFFSET_FACTORY_CLASS_NAME_STRING);
-
     String decoderClassKey =
         StreamConfigProperties.constructStreamProperty(_type, 
StreamConfigProperties.STREAM_DECODER_CLASS);
     _decoderClass = streamConfigMap.get(decoderClassKey);
@@ -269,10 +261,6 @@ public class StreamConfig {
     return _consumerFactoryClassName;
   }
 
-  public String getPartitionOffsetFactoryClassName() {
-    return _partitionOffsetFactoryClassName;
-  }
-
   public OffsetCriteria getOffsetCriteria() {
     return _offsetCriteria;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index c4f7c9d..27205c9 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -69,4 +69,8 @@ public abstract class StreamConsumerFactory {
    * @return
    */
   public abstract StreamMetadataProvider createStreamMetadataProvider(String 
clientId);
+
+  public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() {
+    return new LongMsgOffsetFactory();
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactoryProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactoryProvider.java
index 6cc3364..fbdda1b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactoryProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactoryProvider.java
@@ -25,13 +25,12 @@ import org.apache.pinot.spi.plugin.PluginManager;
  * Provider class for {@link StreamConsumerFactory}
  */
 public abstract class StreamConsumerFactoryProvider {
-
   /**
    * Constructs the {@link StreamConsumerFactory} using the {@link 
StreamConfig::getConsumerFactoryClassName()} property and initializes it
    * @param streamConfig
    * @return
    */
-  public static StreamConsumerFactory createConsumerFactory(StreamConfig 
streamConfig) {
+  public static StreamConsumerFactory create(StreamConfig streamConfig) {
     StreamConsumerFactory factory = null;
     try {
       factory = 
PluginManager.get().createInstance(streamConfig.getConsumerFactoryClassName());
@@ -41,20 +40,4 @@ public abstract class StreamConsumerFactoryProvider {
     factory.init(streamConfig);
     return factory;
   }
-
-  /**
-   * Cronstructs the {@link StreamPartitionMsgOffsetFactory} using {@link 
StreamConfig::getPartitionOffsetFactoryClassName}
-   * and initializes it
-   * @param streamConfig
-   */
-  public static StreamPartitionMsgOffsetFactory 
createOffsetFactory(StreamConfig streamConfig) {
-    StreamPartitionMsgOffsetFactory factory = null;
-    try {
-      factory = 
PluginManager.get().createInstance(streamConfig.getPartitionOffsetFactoryClassName());
-    } catch (Exception e) {
-      ExceptionUtils.rethrow(e);
-    }
-    factory.init(streamConfig);
-    return factory;
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to