npawar opened a new pull request #6518:
URL: https://github.com/apache/incubator-pinot/pull/6518


   ## Description
   Adding Kinesis Connector for Pinot realtime ingestion
   
   Added a `KinesisConsumerFactory` to provide the Kinesis implementation.
   
   Each kinesis consumer consumes from a shard. An abstraction has been 
introduced for a group of partition/shards, and each consumer will now be 
responsible for a PartitionGroup instead of just a partitionId (though in first 
iteration, the PartitionGroup will only contain 1 partition/shard).
   
   Kafka stream should function as is, with no changes needed.
   
   Sample table config:
   ```
   {
     "tableName": "kinesisTest",
     "tableType": "REALTIME",
     "segmentsConfig": {
       "timeColumnName": "DaysSinceEpoch",
       "schemaName": "airlineStats",
       "replicasPerPartition": "1"
     },
     "tenants": {},
     "tableIndexConfig": {
       "loadMode": "MMAP",
       "streamConfigs": {
         "streamType": "kinesis",
         "aws-region": "<region>",
         "max-records-to-fetch": "20",
         "shard-iterator-type": "AFTER_SEQUENCE_NUMBER",
         "stream.kinesis.consumer.type": "lowlevel",
         "stream.kinesis.topic.name": "kinesis-test",
         "stream.kinesis.fetch.timeout.millis": "10000",
         "stream.kinesis.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
         "stream.kinesis.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
         "realtime.segment.flush.threshold.size": "10000",
         "realtime.segment.flush.threshold.time": "1h"
       }
     },
     "metadata": {
       "customConfigs": {}
     }
   }
   ```
   
   Some pending items:
   1. Integration test - not able to find an easy way to create a test Kinesis. 
@KKcorps is looking into this.
   2. Kinesis has shardIds (equivalent to Kafka partition id). These have the 
format `shardId-000000000001`. We are using the numeric suffix as the 
"partitionId" in Pinot. But our Pinot partitionId is int, and with Kinesis, we 
would need long. This change will be done in a followup, since changing this 
was touching a lot of files and exploding the scope of the review.
   3. It would be nice to add a subclass to KafkaJSONDecoder as "JSONDecoder" 
so that the Kinesis table config doesn't have to refer to the "Kafka" decoder.
   
   ## Release Notes
   Kinesis connector
   
   ## Documentation
   Coming very soon
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to