npawar opened a new pull request #6518: URL: https://github.com/apache/incubator-pinot/pull/6518
## Description Adding Kinesis Connector for Pinot realtime ingestion Added a `KinesisConsumerFactory` to provide the Kinesis implementation. Each kinesis consumer consumes from a shard. An abstraction has been introduced for a group of partition/shards, and each consumer will now be responsible for a PartitionGroup instead of just a partitionId (though in first iteration, the PartitionGroup will only contain 1 partition/shard). Kafka stream should function as is, with no changes needed. Sample table config: ``` { "tableName": "kinesisTest", "tableType": "REALTIME", "segmentsConfig": { "timeColumnName": "DaysSinceEpoch", "schemaName": "airlineStats", "replicasPerPartition": "1" }, "tenants": {}, "tableIndexConfig": { "loadMode": "MMAP", "streamConfigs": { "streamType": "kinesis", "aws-region": "<region>", "max-records-to-fetch": "20", "shard-iterator-type": "AFTER_SEQUENCE_NUMBER", "stream.kinesis.consumer.type": "lowlevel", "stream.kinesis.topic.name": "kinesis-test", "stream.kinesis.fetch.timeout.millis": "10000", "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory", "realtime.segment.flush.threshold.size": "10000", "realtime.segment.flush.threshold.time": "1h" } }, "metadata": { "customConfigs": {} } } ``` Some pending items: 1. Integration test - not able to find an easy way to create a test Kinesis. @KKcorps is looking into this. 2. Kinesis has shardIds (equivalent to Kafka partition id). These have the format `shardId-000000000001`. We are using the numeric suffix as the "partitionId" in Pinot. But our Pinot partitionId is int, and with Kinesis, we would need long. This change will be done in a followup, since changing this was touching a lot of files and exploding the scope of the review. 3. It would be nice to add a subclass to KafkaJSONDecoder as "JSONDecoder" so that the Kinesis table config doesn't have to refer to the "Kafka" decoder. ## Release Notes Kinesis connector ## Documentation Coming very soon ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org