This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c24f1cc Enhance Pinot streaming consumer interface (#8047) c24f1cc is described below commit c24f1cc9f712bd7e8c804cd50198af450b95b0dc Author: Navina Ramesh <navi.trin...@gmail.com> AuthorDate: Fri Jan 21 09:22:12 2022 -0800 Enhance Pinot streaming consumer interface (#8047) * Introduces new interface methods - start, commit and rollback in PartitionGroupConsumer; bumping up grpc version to 1.41.0 removed unnecessary final class members; docs for PartitionGroupConsumer APIs * incorporating changes from OSS discussions * Addressing comments from PR * bumping up presto-pinot-driver to grcp 1.40.0 version Co-authored-by: Navina Ramesh <nav...@apache.org> --- pinot-connectors/presto-pinot-driver/pom.xml | 2 +- .../realtime/LLRealtimeSegmentDataManager.java | 15 ++++---- .../pinot/spi/stream/PartitionGroupConsumer.java | 40 ++++++++++++++++++++-- .../pinot/spi/stream/StreamDataProducer.java | 2 +- .../apache/pinot/spi/utils/CommonConstants.java | 6 ++-- .../pinot/tools/streams/MeetupRsvpJsonStream.java | 4 +-- .../pinot/tools/streams/MeetupRsvpStream.java | 12 +++++-- pom.xml | 2 +- 8 files changed, 64 insertions(+), 19 deletions(-) diff --git a/pinot-connectors/presto-pinot-driver/pom.xml b/pinot-connectors/presto-pinot-driver/pom.xml index 3a2f35e..05d4490 100644 --- a/pinot-connectors/presto-pinot-driver/pom.xml +++ b/pinot-connectors/presto-pinot-driver/pom.xml @@ -536,7 +536,7 @@ <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-api</artifactId> - <version>1.30.0</version> + <version>1.41.0</version> <exclusions> <exclusion> <groupId>com.google.guava</groupId> diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 5233ff2..378b0e8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -251,7 +251,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private static final int MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS = 31; private Thread _consumerThread; - private final String _streamTopic; private final int _partitionGroupId; private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus; final String _clientId; @@ -664,6 +663,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { break; case COMMIT: _state = State.COMMITTING; + _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset); long buildTimeSeconds = response.getBuildTimeSeconds(); buildSegmentForCommit(buildTimeSeconds * 1000L); if (_segmentBuildDescriptor == null) { @@ -1232,7 +1232,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _streamConsumerFactory = StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig); _streamPartitionMsgOffsetFactory = StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig).createStreamMsgOffsetFactory(); - _streamTopic = _partitionLevelStreamConfig.getTopicName(); + String streamTopic = _partitionLevelStreamConfig.getTopicName(); _segmentNameStr = _segmentZKMetadata.getSegmentName(); _llcSegmentName = llcSegmentName; _partitionGroupId = _llcSegmentName.getPartitionGroupId(); @@ -1244,9 +1244,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _segmentZKMetadata.getStatus().toString()); _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore; _acquiredConsumerSemaphore = new AtomicBoolean(false); - _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _partitionGroupId; + _metricKeyName = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId; _segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); - _tableStreamName = _tableNameWithType + "_" + _streamTopic; + _tableStreamName = _tableNameWithType + "_" + streamTopic; _memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr, indexLoadingConfig.isRealtimeOffHeapAllocation(), indexLoadingConfig.isDirectRealtimeOffHeapAllocation(), serverMetrics); @@ -1307,7 +1307,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { String consumerDir = realtimeTableDataManager.getConsumerDir(); RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType).setSegmentName(_segmentNameStr) - .setStreamName(_streamTopic).setSchema(_schema).setTimeColumnName(timeColumnName) + .setStreamName(streamTopic).setSchema(_schema).setTimeColumnName(timeColumnName) .setCapacity(_segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount()) .setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns()) .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns()) @@ -1325,7 +1325,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Create message decoder Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema); _messageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead); - _clientId = _streamTopic + "-" + _partitionGroupId; + _clientId = streamTopic + "-" + _partitionGroupId; // Create record transformer _recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema); @@ -1465,9 +1465,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { if (_partitionGroupConsumer != null) { closePartitionGroupConsumer(); } - _segmentLogger.info("Creating new stream consumer, reason: {}", reason); + _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason); _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus); + _partitionGroupConsumer.start(_partitionGroupConsumptionStatus.getStartOffset()); } /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java index 735c91d..8e9308e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java @@ -21,14 +21,33 @@ package org.apache.pinot.spi.stream; import java.io.Closeable; import java.util.concurrent.TimeoutException; - /** * Consumer interface for consuming from a partition group of a stream */ public interface PartitionGroupConsumer extends Closeable { + /** + * Starts a stream consumer + * + * This is useful in cases where starting the consumer involves preparing / initializing the source. + * A typical example is that of an asynchronous / non-poll based consumption model where this method will be used to + * setup or initialize the consumer to fetch messages from the source stream. + * + * Poll-based consumers can optionally use this to prefetch metadata from the source. + * + * This method should be invoked by the caller before trying to invoke + * {@link #fetchMessages(StreamPartitionMsgOffset, StreamPartitionMsgOffset, int)}. + * + * @param startOffset Offset (inclusive) at which the consumption should begin + */ + default void start(StreamPartitionMsgOffset startOffset) { + + } /** - * Fetch messages and offsets from the stream partition group + * Return messages from the stream partition group within the specified timeout + * + * The message may be fetched by actively polling the source or by retrieving from a pre-fetched buffer. This depends + * on the implementation. * * @param startOffset The offset of the first message desired, inclusive * @param endOffset The offset of the last message desired, exclusive, or null @@ -39,4 +58,21 @@ public interface PartitionGroupConsumer extends Closeable { */ MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset endOffset, int timeoutMs) throws TimeoutException; + + /** + * Checkpoints the consumption state of the stream partition group in the source + * + * This is useful in streaming systems that require preserving consumption state on the source in order to resume or + * replay consumption of data. The consumer implementation is responsible for managing this state. + * + * The offset returned will be used for offset comparisons within the local server (say, for catching up) and also, + * persisted to the ZK segment metadata. Hence, the returned value should be same or equivalent to the lastOffset + * provided as input (that is, compareTo of the input and returned offset should be 0). + * + * @param lastOffset checkpoint the stream at this offset (exclusive) + * @return Returns the offset that should be used as the next upcoming offset for the stream partition group + */ + default StreamPartitionMsgOffset checkpoint(StreamPartitionMsgOffset lastOffset) { + return lastOffset; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java index ed19f86..6b3c140 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java @@ -22,7 +22,7 @@ import java.util.Properties; /** - * StreamDataServerStartable is the interface for stream data sources. E.g. KafkaServerStartable, KinesisServerStarable. + * StreamDataProducer is the interface for stream data sources. E.g. KafkaDataProducer. */ public interface StreamDataProducer { void init(Properties props); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 0947b2f..4a6f3ff 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -487,9 +487,9 @@ public class CommonConstants { */ public enum CompletionMode { // default behavior - if the in memory segment in the non-winner server is equivalent to the committed - // segment, then build and - // replace, else download - DEFAULT, // non-winner servers always download the segment, never build it + // segment, then build and replace, else download + DEFAULT, + // non-winner servers always download the segment, never build it DOWNLOAD } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java index 624d277..586bfa7 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java @@ -45,12 +45,12 @@ public class MeetupRsvpJsonStream extends MeetupRsvpStream { try { JsonNode messageJson = JsonUtils.stringToJsonNode(message); String rsvpId = messageJson.get("rsvp_id").asText(); - _producer.produce("meetupRSVPEvents", rsvpId.getBytes(UTF_8), message.getBytes(UTF_8)); + _producer.produce(_topicName, rsvpId.getBytes(UTF_8), message.getBytes(UTF_8)); } catch (Exception e) { LOGGER.error("Caught exception while processing the message: {}", message, e); } } else { - _producer.produce("meetupRSVPEvents", message.getBytes(UTF_8)); + _producer.produce(_topicName, message.getBytes(UTF_8)); } } }; diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java index fd37117..5aa8abe 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java @@ -40,6 +40,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; public class MeetupRsvpStream { protected static final Logger LOGGER = LoggerFactory.getLogger(MeetupRsvpStream.class); + private static final String DEFAULT_TOPIC_NAME = "meetupRSVPEvents"; + protected String _topicName = DEFAULT_TOPIC_NAME; protected final boolean _partitionByKey; protected final StreamDataProducer _producer; @@ -63,6 +65,12 @@ public class MeetupRsvpStream { _producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); } + public MeetupRsvpStream(boolean partitionByKey, StreamDataProducer producer, String topicName) { + _partitionByKey = partitionByKey; + _producer = producer; + _topicName = topicName; + } + public void run() throws Exception { _client = ClientManager.createClient(); @@ -117,10 +125,10 @@ public class MeetupRsvpStream { if (_keepPublishing) { if (_partitionByKey) { - _producer.produce("meetupRSVPEvents", eventId.getBytes(UTF_8), + _producer.produce(_topicName, eventId.getBytes(UTF_8), extractedJson.toString().getBytes(UTF_8)); } else { - _producer.produce("meetupRSVPEvents", extractedJson.toString().getBytes(UTF_8)); + _producer.produce(_topicName, extractedJson.toString().getBytes(UTF_8)); } } } catch (Exception e) { diff --git a/pom.xml b/pom.xml index 426936c..1d38cf3 100644 --- a/pom.xml +++ b/pom.xml @@ -164,7 +164,7 @@ TODO: figure out a way to inject kafka dependency instead of explicitly setting the kafka module dependency --> <kafka.version>2.0</kafka.version> <protobuf.version>3.12.0</protobuf.version> - <grpc.version>1.30.0</grpc.version> + <grpc.version>1.41.0</grpc.version> <!-- Checkstyle violation prop.--> <checkstyle.violation.severity>warning</checkstyle.violation.severity> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org