This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f60bfc8  Add support for Kinesis datasource in Github Events recipe 
(#8333)
f60bfc8 is described below

commit f60bfc8df6172c7d00f7ca9bd9d2995dde8d33e3
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Fri Mar 18 00:07:03 2022 +0530

    Add support for Kinesis datasource in Github Events recipe (#8333)
    
    * Add support for Kinesis datasource in Github Events recipe
    
    * Add support to publish github events in StreamGithubEventsCommand
    
    * Replace string with Constants
    
    * refactor: rename kinesis variables to reflect they are kinesis props
    
    * Add missing license headers
    
    * Fix linting errors
    
    * Refactor: Rename variables to be kinesis provider agnostic, Modify 
javadoc to reflect kinesis support
    
    * Add timeouts while checking status of Kinesis stream
    
    * Modify Kinesis producer to detach static aws credentials from endpoint 
provider
    
    * Make accessKey and secretKey configurable for Stream command
    
    * Fix linting errors
    
    * fix linting
---
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml   |  13 ++
 .../stream/kinesis/server/KinesisDataProducer.java | 104 +++++++++++++++
 .../kinesis/server/KinesisDataServerStartable.java | 145 +++++++++++++++++++++
 .../apache/pinot/tools/GitHubEventsQuickstart.java |  86 +++++++++---
 .../command/GitHubEventsQuickStartCommand.java     |  12 +-
 .../admin/command/StreamGitHubEventsCommand.java   |  41 +++++-
 .../PullRequestMergedEventsStream.java             |  37 ++++++
 .../pinot/tools/utils/KinesisStarterUtils.java     |  66 ++++++++++
 .../apache/pinot/tools/utils/StreamSourceType.java |  24 ++++
 ...MergedEvents_kinesis_realtime_table_config.json |  39 ++++++
 10 files changed, 545 insertions(+), 22 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 4bdbe1a..e79c881 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -39,6 +39,7 @@
     <aws.version>2.14.28</aws.version>
     <easymock.version>4.2</easymock.version>
     <reactive.version>1.0.2</reactive.version>
+    <localstack-utils.version>0.2.19</localstack-utils.version>
   </properties>
 
   <dependencyManagement>
@@ -185,6 +186,18 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>cloud.localstack</groupId>
+      <artifactId>localstack-utils</artifactId>
+      <version>${localstack-utils.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>software.amazon.awssdk</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
   </dependencies>
   <profiles>
     <profile>
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
new file mode 100644
index 0000000..150ac78
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kinesis.server;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+
+public class KinesisDataProducer implements StreamDataProducer {
+  public static final String ENDPOINT = "endpoint";
+  public static final String REGION = "region";
+  public static final String ACCESS = "access";
+  public static final String SECRET = "secret";
+  public static final String DEFAULT_PORT = "4566";
+  public static final String DEFAULT_ENDPOINT = "http://localhost:4566";;
+
+  private KinesisClient _kinesisClient;
+
+  @Override
+  public void init(Properties props) {
+    try {
+      KinesisClientBuilder kinesisClientBuilder;
+      if (props.containsKey(ACCESS) && props.containsKey(SECRET)) {
+        kinesisClientBuilder = 
KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+            .credentialsProvider(getLocalAWSCredentials(props))
+            .httpClientBuilder(new 
ApacheSdkHttpService().createHttpClientBuilder());
+      } else {
+        kinesisClientBuilder =
+            
KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+                .credentialsProvider(DefaultCredentialsProvider.create())
+                .httpClientBuilder(new 
ApacheSdkHttpService().createHttpClientBuilder());
+      }
+
+      if (props.containsKey(ENDPOINT)) {
+        String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
+        try {
+          kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new 
URI(kinesisEndpoint));
+        } catch (URISyntaxException e) {
+          throw new IllegalArgumentException("URI syntax is not correctly 
specified for endpoint: "
+              + kinesisEndpoint, e);
+        }
+      }
+
+      _kinesisClient = kinesisClientBuilder.build();
+    } catch (Exception e) {
+      _kinesisClient = null;
+    }
+  }
+
+  @Override
+  public void produce(String topic, byte[] payload) {
+    PutRecordRequest putRecordRequest =
+        
PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload))
+            .partitionKey(UUID.randomUUID().toString()).build();
+    PutRecordResponse putRecordResponse = 
_kinesisClient.putRecord(putRecordRequest);
+  }
+
+  @Override
+  public void produce(String topic, byte[] key, byte[] payload) {
+    PutRecordRequest putRecordRequest =
+        
PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)).partitionKey(new
 String(key))
+            .build();
+    PutRecordResponse putRecordResponse = 
_kinesisClient.putRecord(putRecordRequest);
+  }
+
+  @Override
+  public void close() {
+    _kinesisClient.close();
+  }
+
+  private AwsCredentialsProvider getLocalAWSCredentials(Properties props) {
+    return StaticCredentialsProvider.create(
+        AwsBasicCredentials.create(props.getProperty(ACCESS), 
props.getProperty(SECRET)));
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataServerStartable.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataServerStartable.java
new file mode 100644
index 0000000..4bf07f0
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataServerStartable.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kinesis.server;
+
+import cloud.localstack.Localstack;
+import cloud.localstack.ServiceName;
+import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
+import com.google.common.base.Function;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.utils.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.utils.AttributeMap;
+
+
+public class KinesisDataServerStartable implements StreamDataServerStartable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KinesisDataServerStartable.class);
+
+  public static final String NUM_SHARDS_PROPERTY = "numShards";
+  public static final String DEFAULT_REGION = "us-east-1";
+  public static final String DEFAULT_ACCESS_KEY = "access";
+  public static final String DEFAULT_SECRET_KEY = "secret";
+  public static final String DEFAULT_PORT = "4566";
+
+  private final Localstack _localstackDocker = Localstack.INSTANCE;
+  LocalstackDockerConfiguration _dockerConfig;
+  Properties _serverProperties;
+  private String _localStackKinesisEndpoint = "http://localhost:%s";;
+
+  @Override
+  public void init(Properties props) {
+    _serverProperties = props;
+    final Map<String, String> environmentVariables = new HashMap<>();
+    environmentVariables.put("SERVICES", ServiceName.KINESIS);
+    _dockerConfig =
+        
LocalstackDockerConfiguration.builder().portEdge(_serverProperties.getProperty("port",
 DEFAULT_PORT))
+            
.portElasticSearch(String.valueOf(NetUtils.findOpenPort(4571))).imageTag("0.12.15")
+            .environmentVariables(environmentVariables).build();
+
+    _localStackKinesisEndpoint =
+        String.format(_localStackKinesisEndpoint, 
_serverProperties.getProperty("port", DEFAULT_PORT));
+  }
+
+  @Override
+  public void start() {
+    _localstackDocker.startup(_dockerConfig);
+  }
+
+  @Override
+  public void stop() {
+    _localstackDocker.stop();
+  }
+
+  @Override
+  public void createTopic(String topic, Properties topicProps) {
+    try {
+      KinesisClient kinesisClient = KinesisClient.builder().httpClient(
+              new 
ApacheSdkHttpService().createHttpClientBuilder().buildWithDefaults(
+                  
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, 
Boolean.TRUE).build()))
+          
.credentialsProvider(getLocalAWSCredentials()).region(Region.of(DEFAULT_REGION))
+          .endpointOverride(new URI(_localStackKinesisEndpoint)).build();
+
+      kinesisClient.createStream(
+          CreateStreamRequest.builder().streamName(topic).shardCount((Integer) 
topicProps.get(NUM_SHARDS_PROPERTY))
+              .build());
+
+      waitForCondition(new Function<Void, Boolean>() {
+        @Nullable
+        @Override
+        public Boolean apply(@Nullable Void aVoid) {
+          try {
+            String kinesisStreamStatus =
+                
kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(topic).build())
+                    .streamDescription().streamStatusAsString();
+
+            return kinesisStreamStatus.contentEquals("ACTIVE");
+          } catch (Exception e) {
+            LOGGER.warn("Could not fetch kinesis stream status", e);
+            return null;
+          }
+        }
+      }, 1000L, 30000, "Kinesis stream " + topic + " is not created or is not 
in active state");
+
+      LOGGER.info("Kinesis stream created successfully: " + topic);
+    } catch (Exception e) {
+      LOGGER.warn("Error occurred while creating topic: " + topic, e);
+    }
+  }
+
+  @Override
+  public int getPort() {
+    return _localstackDocker.getEdgePort();
+  }
+
+  private AwsCredentialsProvider getLocalAWSCredentials() {
+    return 
StaticCredentialsProvider.create(AwsBasicCredentials.create(DEFAULT_ACCESS_KEY, 
DEFAULT_SECRET_KEY));
+  }
+
+  private static void waitForCondition(Function<Void, Boolean> condition, long 
checkIntervalMs, long timeoutMs,
+      @Nullable String errorMessage) {
+    long endTime = System.currentTimeMillis() + timeoutMs;
+    String errorMessageSuffix = errorMessage != null ? ", error message: " + 
errorMessage : "";
+    while (System.currentTimeMillis() < endTime) {
+      try {
+        if (Boolean.TRUE.equals(condition.apply(null))) {
+          return;
+        }
+        Thread.sleep(checkIntervalMs);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while checking the condition" + 
errorMessageSuffix, e);
+      }
+    }
+    LOGGER.error("Failed to meet condition in " + timeoutMs + "ms" + 
errorMessageSuffix);
+  }
+}
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
index 0af8d64..50c162d 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Properties;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.spi.stream.StreamDataProvider;
@@ -32,6 +33,8 @@ import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import 
org.apache.pinot.tools.streams.githubevents.PullRequestMergedEventsStream;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
+import org.apache.pinot.tools.utils.KinesisStarterUtils;
+import org.apache.pinot.tools.utils.StreamSourceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,15 +43,16 @@ import static 
org.apache.pinot.tools.Quickstart.prettyPrintResponse;
 
 /**
  * Sets up a demo Pinot cluster with 1 zookeeper, 1 controller, 1 broker and 1 
server
- * Sets up a demo Kafka cluster, and creates a topic pullRequestMergedEvents
+ * Sets up a demo Kafka/Kinesis cluster, and creates a topic 
pullRequestMergedEvents
  * Creates a realtime table pullRequestMergedEvents
  * Starts the {@link PullRequestMergedEventsStream} to publish 
pullRequestMergedEvents into the topic
  */
 public class GitHubEventsQuickstart extends QuickStartBase {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GitHubEventsQuickstart.class);
-  private StreamDataServerStartable _kafkaStarter;
+  private StreamDataServerStartable _serverStarter;
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
   private String _personalAccessToken;
+  private StreamSourceType _sourceType;
 
   public GitHubEventsQuickstart() {
   }
@@ -56,16 +60,46 @@ public class GitHubEventsQuickstart extends QuickStartBase {
   private void startKafka() {
     _zookeeperInstance = ZkStarter.startLocalZkServer();
     try {
-      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
+      _serverStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
           KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance));
     } catch (Exception e) {
       throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
     }
-    _kafkaStarter.start();
-    _kafkaStarter.createTopic("pullRequestMergedEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
+    _serverStarter.start();
+    _serverStarter.createTopic("pullRequestMergedEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
   }
 
-  private void execute(String personalAccessToken)
+  private void startKinesis() {
+    try {
+
+      Properties serverProperties = new Properties();
+      serverProperties.put(KinesisStarterUtils.PORT, 4566);
+      _serverStarter =
+          
StreamDataProvider.getServerDataStartable(KinesisStarterUtils.KINESIS_SERVER_STARTABLE_CLASS_NAME,
+              serverProperties);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + 
KinesisStarterUtils.KINESIS_SERVER_STARTABLE_CLASS_NAME, e);
+    }
+    _serverStarter.start();
+
+    Properties topicProperties = new Properties();
+    topicProperties.put(KinesisStarterUtils.NUM_SHARDS, 3);
+    _serverStarter.createTopic("pullRequestMergedEvents", topicProperties);
+  }
+
+  private void startStreamServer() {
+    switch (_sourceType) {
+      case KINESIS:
+        startKinesis();
+        break;
+      case KAFKA:
+      default:
+        startKafka();
+        break;
+    }
+  }
+
+  private void execute(String personalAccessToken, StreamSourceType 
streamSourceType)
       throws Exception {
     final File quickStartDataDir =
         new File(new File("githubEvents-" + System.currentTimeMillis()), 
"pullRequestMergedEvents");
@@ -81,8 +115,8 @@ public class GitHubEventsQuickstart extends QuickStartBase {
     URL resource = 
classLoader.getResource("examples/stream/githubEvents/pullRequestMergedEvents_schema.json");
     Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, schemaFile);
-    resource =
-        
classLoader.getResource("examples/stream/githubEvents/pullRequestMergedEvents_realtime_table_config.json");
+    String tableConfigFilePath = getTableConfigFilePath();
+    resource = classLoader.getResource(tableConfigFilePath);
     Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, tableConfigFile);
 
@@ -92,8 +126,8 @@ public class GitHubEventsQuickstart extends QuickStartBase {
     final QuickstartRunner runner =
         new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, tempDir, 
getConfigOverrides());
 
-    printStatus(Color.CYAN, "***** Starting Kafka *****");
-    startKafka();
+    printStatus(Color.CYAN, String.format("***** Starting %s *****", 
streamSourceType));
+    startStreamServer();
 
     printStatus(Color.CYAN, "***** Starting zookeeper, controller, server and 
broker *****");
     runner.startAll();
@@ -101,10 +135,11 @@ public class GitHubEventsQuickstart extends 
QuickStartBase {
     printStatus(Color.CYAN, "***** Adding pullRequestMergedEvents table 
*****");
     runner.bootstrapTable();
 
-    printStatus(Color.CYAN, "***** Starting pullRequestMergedEvents data 
stream and publishing to Kafka *****");
+    printStatus(Color.CYAN,
+        String.format("***** Starting pullRequestMergedEvents data stream and 
publishing to %s *****", _sourceType));
     final PullRequestMergedEventsStream pullRequestMergedEventsStream =
-        new PullRequestMergedEventsStream(schemaFile.getAbsolutePath(), 
"pullRequestMergedEvents",
-            personalAccessToken, 
PullRequestMergedEventsStream.getKafkaStreamDataProducer());
+        new PullRequestMergedEventsStream(schemaFile.getAbsolutePath(), 
"pullRequestMergedEvents", personalAccessToken,
+            PullRequestMergedEventsStream.getStreamDataProducer(_sourceType));
     pullRequestMergedEventsStream.execute();
     printStatus(Color.CYAN, "***** Waiting for 10 seconds for a few events to 
get populated *****");
     Thread.sleep(10000);
@@ -113,7 +148,7 @@ public class GitHubEventsQuickstart extends QuickStartBase {
       try {
         printStatus(Color.GREEN, "***** Shutting down GitHubEventsQuickStart 
*****");
         runner.stop();
-        _kafkaStarter.stop();
+        _serverStarter.stop();
         ZkStarter.stopLocalZkServer(_zookeeperInstance);
         FileUtils.deleteDirectory(quickStartDataDir);
       } catch (Exception e) {
@@ -156,6 +191,22 @@ public class GitHubEventsQuickstart extends QuickStartBase 
{
     printStatus(Color.GREEN, "You can always go to http://localhost:9000 to 
play around in the query console");
   }
 
+  private String getTableConfigFilePath() {
+    String tableConfigFilePath;
+    switch (_sourceType) {
+      case KINESIS:
+        tableConfigFilePath =
+            
"examples/stream/githubEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json";
+        break;
+      case KAFKA:
+      default:
+        tableConfigFilePath =
+            
"examples/stream/githubEvents/pullRequestMergedEvents_realtime_table_config.json";
+        break;
+    }
+    return tableConfigFilePath;
+  }
+
   @Override
   public List<String> types() {
     return Arrays.asList("GITHUB-EVENTS", "GITHUB_EVENTS");
@@ -164,11 +215,16 @@ public class GitHubEventsQuickstart extends 
QuickStartBase {
   @Override
   public void execute()
       throws Exception {
-    execute(_personalAccessToken);
+    execute(_personalAccessToken, _sourceType);
   }
 
   public GitHubEventsQuickstart setPersonalAccessToken(String 
personalAccessToken) {
     _personalAccessToken = personalAccessToken;
     return this;
   }
+
+  public GitHubEventsQuickstart setSourceType(String sourceType) {
+    _sourceType = StreamSourceType.valueOf(sourceType.toUpperCase());
+    return this;
+  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GitHubEventsQuickStartCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GitHubEventsQuickStartCommand.java
index 578f94d..e4cc73d 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GitHubEventsQuickStartCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GitHubEventsQuickStartCommand.java
@@ -33,6 +33,10 @@ public class GitHubEventsQuickStartCommand extends 
AbstractBaseAdminCommand impl
   @CommandLine.Option(names = {"-personalAccessToken"}, required = true, 
description = "GitHub personal access token.")
   private String _personalAccessToken;
 
+  @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka",
+      description = "Stream DataSource to use for ingesting data. Supported 
values - Kafka,Kinesis")
+  private String _sourceType;
+
   @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, help = true, 
description = "Print this message.")
   private boolean _help = false;
 
@@ -40,6 +44,10 @@ public class GitHubEventsQuickStartCommand extends 
AbstractBaseAdminCommand impl
     _personalAccessToken = personalAccessToken;
   }
 
+  public void setSourceType(String sourceType) {
+    _sourceType = sourceType;
+  }
+
   @Override
   public boolean getHelp() {
     return _help;
@@ -52,7 +60,7 @@ public class GitHubEventsQuickStartCommand extends 
AbstractBaseAdminCommand impl
 
   @Override
   public String toString() {
-    return ("GitHubEventsQuickStart -personalAccessToken " + 
_personalAccessToken);
+    return ("GitHubEventsQuickStart -personalAccessToken " + 
_personalAccessToken + " -sourceType" + _sourceType);
   }
 
   @Override
@@ -68,7 +76,7 @@ public class GitHubEventsQuickStartCommand extends 
AbstractBaseAdminCommand impl
   public boolean execute()
       throws Exception {
     PluginManager.get().init();
-    new 
GitHubEventsQuickstart().setPersonalAccessToken(_personalAccessToken).execute();
+    new 
GitHubEventsQuickstart().setPersonalAccessToken(_personalAccessToken).setSourceType(_sourceType).execute();
     return true;
   }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
index 269281d..579dd63 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
@@ -19,14 +19,16 @@
 package org.apache.pinot.tools.admin.command;
 
 import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.tools.Command;
 import 
org.apache.pinot.tools.streams.githubevents.PullRequestMergedEventsStream;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
+import org.apache.pinot.tools.utils.StreamSourceType;
 import picocli.CommandLine;
 
 
 /**
- * Command to stream GitHub events into a kafka topic
+ * Command to stream GitHub events into a kafka topic or kinesis stream
  */
 @CommandLine.Command(name = "StreamGitHubEvents")
 public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand 
implements Command {
@@ -36,11 +38,29 @@ public class StreamGitHubEventsCommand extends 
AbstractBaseAdminCommand implemen
   @CommandLine.Option(names = {"-personalAccessToken"}, required = true, 
description = "GitHub personal access token.")
   private String _personalAccessToken;
 
+  @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka",
+      description = "Stream DataSource to use for ingesting data. Supported 
values - Kafka,Kinesis")
+  private String _sourceType;
+
   @CommandLine.Option(names = {"-kafkaBrokerList"},
       description = "Kafka broker list of the kafka cluster to produce 
events.")
   private String _kafkaBrokerList = KafkaStarterUtils.DEFAULT_KAFKA_BROKER;
 
-  @CommandLine.Option(names = {"-topic"}, required = true, description = "Name 
of kafka topic to publish events.")
+  @CommandLine.Option(names = {"-kinesisEndpoint"},
+      description = "Endpoint of localstack or any other Kinesis cluster when 
not using AWS.")
+  private String _kinesisEndpoint = null;
+
+  @CommandLine.Option(names = {"-awsRegion"}, description = "AWS Region in 
which Kinesis is located")
+  private String _awsRegion = "us-east-1";
+
+  @CommandLine.Option(names = {"-awsAccessKey"}, description = "AccessKey for 
AWS Account.")
+  private String _accessKey;
+
+  @CommandLine.Option(names = {"-awsSecretKey"}, description = "SecretKey for 
AWS Account")
+  private String _secretKey;
+
+  @CommandLine.Option(names = {"-topic"}, required = true,
+      description = "Name of kafka-topic/kinesis-stream to publish events.")
   private String _topic;
 
   @CommandLine.Option(names = {"-eventType"},
@@ -96,7 +116,7 @@ public class StreamGitHubEventsCommand extends 
AbstractBaseAdminCommand implemen
 
   @Override
   public String description() {
-    return "Streams GitHubEvents into a Kafka topic";
+    return "Streams GitHubEvents into a Kafka topic or Kinesis Stream";
   }
 
   @Override
@@ -104,9 +124,20 @@ public class StreamGitHubEventsCommand extends 
AbstractBaseAdminCommand implemen
       throws Exception {
     PluginManager.get().init();
     if (PULL_REQUEST_MERGED_EVENT_TYPE.equals(_eventType)) {
+      StreamDataProducer streamDataProducer;
+      switch (StreamSourceType.valueOf(_sourceType.toUpperCase())) {
+        case KINESIS:
+          streamDataProducer =
+              
PullRequestMergedEventsStream.getKinesisStreamDataProducer(_kinesisEndpoint, 
_awsRegion, _accessKey,
+                  _secretKey);
+          break;
+        case KAFKA:
+        default:
+          streamDataProducer = 
PullRequestMergedEventsStream.getKafkaStreamDataProducer(_kafkaBrokerList);
+          break;
+      }
       PullRequestMergedEventsStream pullRequestMergedEventsStream =
-          new PullRequestMergedEventsStream(_schemaFile, _topic, 
_personalAccessToken,
-              
PullRequestMergedEventsStream.getKafkaStreamDataProducer(_kafkaBrokerList));
+          new PullRequestMergedEventsStream(_schemaFile, _topic, 
_personalAccessToken, streamDataProducer);
       pullRequestMergedEventsStream.execute();
     } else {
       throw new UnsupportedOperationException("Event type " + _eventType + " 
is unsupported");
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
index 465f2fd..ab45f45 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
@@ -30,12 +30,15 @@ import java.util.concurrent.Executors;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.Quickstart;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
+import org.apache.pinot.tools.utils.KinesisStarterUtils;
+import org.apache.pinot.tools.utils.StreamSourceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,6 +102,40 @@ public class PullRequestMergedEventsStream {
     return 
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
   }
 
+  public static StreamDataProducer getKinesisStreamDataProducer(String 
endpoint, String region, String access,
+      String secret)
+      throws Exception {
+    Properties properties = new Properties();
+
+    if (StringUtils.isNotEmpty(access) && StringUtils.isNotEmpty(secret)) {
+      properties.put("access", access);
+      properties.put("secret", secret);
+    }
+
+    if (StringUtils.isNotEmpty(endpoint)) {
+      properties.put("endpoint", endpoint);
+    }
+    properties.put("region", region);
+    return 
StreamDataProvider.getStreamDataProducer(KinesisStarterUtils.KINESIS_PRODUCER_CLASS_NAME,
 properties);
+  }
+
+  public static StreamDataProducer getKinesisStreamDataProducer()
+      throws Exception {
+    return getKinesisStreamDataProducer("http://localhost:4566";, "us-east-1", 
"access", "secret");
+  }
+
+  public static StreamDataProducer getStreamDataProducer(StreamSourceType 
streamSourceType)
+      throws Exception {
+    switch (streamSourceType) {
+      case KAFKA:
+        return getKafkaStreamDataProducer();
+      case KINESIS:
+        return getKinesisStreamDataProducer();
+      default:
+        throw new RuntimeException("Invalid streamSourceType specified: " + 
streamSourceType);
+    }
+  }
+
   public static void main(String[] args)
       throws Exception {
     String personalAccessToken = args[0];
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KinesisStarterUtils.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KinesisStarterUtils.java
new file mode 100644
index 0000000..3f6160a
--- /dev/null
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KinesisStarterUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.utils;
+
+import java.util.Properties;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+
+
+public class KinesisStarterUtils {
+  private KinesisStarterUtils() {
+  }
+
+  public static final String DEFAULT_KINESIS_PORT = "4566";
+  public static final String DEFAULT_KINESIS_ENDPOINT = "http://localhost:"; + 
DEFAULT_KINESIS_PORT;
+
+  public static final String KINESIS_SERVER_STARTABLE_CLASS_NAME =
+      getKinesisConnectorPackageName() + ".server.KinesisDataServerStartable";
+  public static final String KINESIS_PRODUCER_CLASS_NAME =
+      getKinesisConnectorPackageName() + ".server.KinesisDataProducer";
+  public static final String KINESIS_STREAM_CONSUMER_FACTORY_CLASS_NAME =
+      getKinesisConnectorPackageName() + ".KinesisConsumerFactory";
+
+  public static final String PORT = "port";
+  public static final String NUM_SHARDS = "numShards";
+
+  private static String getKinesisConnectorPackageName() {
+    return "org.apache.pinot.plugin.stream.kinesis";
+  }
+
+  public static Properties getTopicCreationProps(int numKinesisShards) {
+    Properties topicProps = new Properties();
+    topicProps.put(NUM_SHARDS, numKinesisShards);
+    return topicProps;
+  }
+
+  public static StreamDataServerStartable startServer(final int port, final 
Properties baseConf) {
+    StreamDataServerStartable kinesisStarter;
+    Properties configuration = new Properties(baseConf);
+    int kinesisPort = port;
+    try {
+      configuration.put(KinesisStarterUtils.PORT, kinesisPort);
+      kinesisStarter = 
StreamDataProvider.getServerDataStartable(KINESIS_SERVER_STARTABLE_CLASS_NAME, 
configuration);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + 
KINESIS_SERVER_STARTABLE_CLASS_NAME, e);
+    }
+    kinesisStarter.start();
+    return kinesisStarter;
+  }
+}
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/StreamSourceType.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/StreamSourceType.java
new file mode 100644
index 0000000..8464512
--- /dev/null
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/StreamSourceType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.utils;
+
+public enum StreamSourceType {
+  KAFKA,
+  KINESIS;
+}
diff --git 
a/pinot-tools/src/main/resources/examples/stream/githubEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/githubEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json
new file mode 100644
index 0000000..7cdd27b
--- /dev/null
+++ 
b/pinot-tools/src/main/resources/examples/stream/githubEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json
@@ -0,0 +1,39 @@
+{
+  "tableName": "pullRequestMergedEvents",
+  "tableType": "REALTIME",
+  "segmentsConfig": {
+    "timeColumnName": "mergedTimeMillis",
+    "retentionTimeUnit": "DAYS",
+    "retentionTimeValue": "60",
+    "schemaName": "pullRequestMergedEvents",
+    "replication": "1",
+    "replicasPerPartition": "1"
+  },
+  "tenants": {},
+  "tableIndexConfig": {
+    "loadMode": "MMAP",
+    "invertedIndexColumns": [
+      "organization",
+      "repo"
+    ],
+    "streamConfigs": {
+      "streamType": "kinesis",
+      "stream.kinesis.consumer.type": "lowlevel",
+      "stream.kinesis.topic.name": "pullRequestMergedEvents",
+      "stream.kinesis.decoder.class.name": 
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
+      "stream.kinesis.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
+      "realtime.segment.flush.threshold.time": "12h",
+      "realtime.segment.flush.threshold.size": "100000",
+      "stream.kinesis.consumer.prop.auto.offset.reset": "smallest",
+      "region": "us-east-1",
+      "shardIteratorType": "TRIM_HORIZON",
+      "endpoint" : "http://localhost:4566";,
+      "accessKey" : "access",
+      "secretKey": "secret"
+    }
+  },
+  "metadata": {
+    "customConfigs": {}
+  }
+}
+

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to