This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f60bfc8 Add support for Kinesis datasource in Github Events recipe (#8333) f60bfc8 is described below commit f60bfc8df6172c7d00f7ca9bd9d2995dde8d33e3 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Fri Mar 18 00:07:03 2022 +0530 Add support for Kinesis datasource in Github Events recipe (#8333) * Add support for Kinesis datasource in Github Events recipe * Add support to publish github events in StreamGithubEventsCommand * Replace string with Constants * refactor: rename kinesis variables to reflect they are kinesis props * Add missing license headers * Fix linting errors * Refactor: Rename variables to be kinesis provider agnostic, Modify javadoc to reflect kinesis support * Add timeouts while checking status of Kinesis stream * Modify Kinesis producer to detach static aws credentials from endpoint provider * Make accessKey and secretKey configurable for Stream command * Fix linting errors * fix linting --- .../pinot-stream-ingestion/pinot-kinesis/pom.xml | 13 ++ .../stream/kinesis/server/KinesisDataProducer.java | 104 +++++++++++++++ .../kinesis/server/KinesisDataServerStartable.java | 145 +++++++++++++++++++++ .../apache/pinot/tools/GitHubEventsQuickstart.java | 86 +++++++++--- .../command/GitHubEventsQuickStartCommand.java | 12 +- .../admin/command/StreamGitHubEventsCommand.java | 41 +++++- .../PullRequestMergedEventsStream.java | 37 ++++++ .../pinot/tools/utils/KinesisStarterUtils.java | 66 ++++++++++ .../apache/pinot/tools/utils/StreamSourceType.java | 24 ++++ ...MergedEvents_kinesis_realtime_table_config.json | 39 ++++++ 10 files changed, 545 insertions(+), 22 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml index 4bdbe1a..e79c881 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml @@ -39,6 +39,7 @@ <aws.version>2.14.28</aws.version> <easymock.version>4.2</easymock.version> <reactive.version>1.0.2</reactive.version> + <localstack-utils.version>0.2.19</localstack-utils.version> </properties> <dependencyManagement> @@ -185,6 +186,18 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>cloud.localstack</groupId> + <artifactId>localstack-utils</artifactId> + <version>${localstack-utils.version}</version> + <exclusions> + <exclusion> + <groupId>software.amazon.awssdk</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> <profiles> <profile> diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java new file mode 100644 index 0000000..150ac78 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kinesis.server; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; +import java.util.UUID; +import org.apache.pinot.spi.stream.StreamDataProducer; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.http.apache.ApacheSdkHttpService; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; + +public class KinesisDataProducer implements StreamDataProducer { + public static final String ENDPOINT = "endpoint"; + public static final String REGION = "region"; + public static final String ACCESS = "access"; + public static final String SECRET = "secret"; + public static final String DEFAULT_PORT = "4566"; + public static final String DEFAULT_ENDPOINT = "http://localhost:4566"; + + private KinesisClient _kinesisClient; + + @Override + public void init(Properties props) { + try { + KinesisClientBuilder kinesisClientBuilder; + if (props.containsKey(ACCESS) && props.containsKey(SECRET)) { + kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION))) + .credentialsProvider(getLocalAWSCredentials(props)) + .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder()); + } else { + kinesisClientBuilder = + KinesisClient.builder().region(Region.of(props.getProperty(REGION))) + .credentialsProvider(DefaultCredentialsProvider.create()) + .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder()); + } + + if (props.containsKey(ENDPOINT)) { + String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT); + try { + kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint)); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + + kinesisEndpoint, e); + } + } + + _kinesisClient = kinesisClientBuilder.build(); + } catch (Exception e) { + _kinesisClient = null; + } + } + + @Override + public void produce(String topic, byte[] payload) { + PutRecordRequest putRecordRequest = + PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)) + .partitionKey(UUID.randomUUID().toString()).build(); + PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest); + } + + @Override + public void produce(String topic, byte[] key, byte[] payload) { + PutRecordRequest putRecordRequest = + PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)).partitionKey(new String(key)) + .build(); + PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest); + } + + @Override + public void close() { + _kinesisClient.close(); + } + + private AwsCredentialsProvider getLocalAWSCredentials(Properties props) { + return StaticCredentialsProvider.create( + AwsBasicCredentials.create(props.getProperty(ACCESS), props.getProperty(SECRET))); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataServerStartable.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataServerStartable.java new file mode 100644 index 0000000..4bf07f0 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataServerStartable.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kinesis.server; + +import cloud.localstack.Localstack; +import cloud.localstack.ServiceName; +import cloud.localstack.docker.annotation.LocalstackDockerConfiguration; +import com.google.common.base.Function; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import javax.annotation.Nullable; +import org.apache.pinot.spi.stream.StreamDataServerStartable; +import org.apache.pinot.spi.utils.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.apache.ApacheSdkHttpService; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.utils.AttributeMap; + + +public class KinesisDataServerStartable implements StreamDataServerStartable { + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisDataServerStartable.class); + + public static final String NUM_SHARDS_PROPERTY = "numShards"; + public static final String DEFAULT_REGION = "us-east-1"; + public static final String DEFAULT_ACCESS_KEY = "access"; + public static final String DEFAULT_SECRET_KEY = "secret"; + public static final String DEFAULT_PORT = "4566"; + + private final Localstack _localstackDocker = Localstack.INSTANCE; + LocalstackDockerConfiguration _dockerConfig; + Properties _serverProperties; + private String _localStackKinesisEndpoint = "http://localhost:%s"; + + @Override + public void init(Properties props) { + _serverProperties = props; + final Map<String, String> environmentVariables = new HashMap<>(); + environmentVariables.put("SERVICES", ServiceName.KINESIS); + _dockerConfig = + LocalstackDockerConfiguration.builder().portEdge(_serverProperties.getProperty("port", DEFAULT_PORT)) + .portElasticSearch(String.valueOf(NetUtils.findOpenPort(4571))).imageTag("0.12.15") + .environmentVariables(environmentVariables).build(); + + _localStackKinesisEndpoint = + String.format(_localStackKinesisEndpoint, _serverProperties.getProperty("port", DEFAULT_PORT)); + } + + @Override + public void start() { + _localstackDocker.startup(_dockerConfig); + } + + @Override + public void stop() { + _localstackDocker.stop(); + } + + @Override + public void createTopic(String topic, Properties topicProps) { + try { + KinesisClient kinesisClient = KinesisClient.builder().httpClient( + new ApacheSdkHttpService().createHttpClientBuilder().buildWithDefaults( + AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.TRUE).build())) + .credentialsProvider(getLocalAWSCredentials()).region(Region.of(DEFAULT_REGION)) + .endpointOverride(new URI(_localStackKinesisEndpoint)).build(); + + kinesisClient.createStream( + CreateStreamRequest.builder().streamName(topic).shardCount((Integer) topicProps.get(NUM_SHARDS_PROPERTY)) + .build()); + + waitForCondition(new Function<Void, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable Void aVoid) { + try { + String kinesisStreamStatus = + kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(topic).build()) + .streamDescription().streamStatusAsString(); + + return kinesisStreamStatus.contentEquals("ACTIVE"); + } catch (Exception e) { + LOGGER.warn("Could not fetch kinesis stream status", e); + return null; + } + } + }, 1000L, 30000, "Kinesis stream " + topic + " is not created or is not in active state"); + + LOGGER.info("Kinesis stream created successfully: " + topic); + } catch (Exception e) { + LOGGER.warn("Error occurred while creating topic: " + topic, e); + } + } + + @Override + public int getPort() { + return _localstackDocker.getEdgePort(); + } + + private AwsCredentialsProvider getLocalAWSCredentials() { + return StaticCredentialsProvider.create(AwsBasicCredentials.create(DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY)); + } + + private static void waitForCondition(Function<Void, Boolean> condition, long checkIntervalMs, long timeoutMs, + @Nullable String errorMessage) { + long endTime = System.currentTimeMillis() + timeoutMs; + String errorMessageSuffix = errorMessage != null ? ", error message: " + errorMessage : ""; + while (System.currentTimeMillis() < endTime) { + try { + if (Boolean.TRUE.equals(condition.apply(null))) { + return; + } + Thread.sleep(checkIntervalMs); + } catch (Exception e) { + LOGGER.error("Caught exception while checking the condition" + errorMessageSuffix, e); + } + } + LOGGER.error("Failed to meet condition in " + timeoutMs + "ms" + errorMessageSuffix); + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java index 0af8d64..50c162d 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java @@ -24,6 +24,7 @@ import java.io.File; import java.net.URL; import java.util.Arrays; import java.util.List; +import java.util.Properties; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.ZkStarter; import org.apache.pinot.spi.stream.StreamDataProvider; @@ -32,6 +33,8 @@ import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.command.QuickstartRunner; import org.apache.pinot.tools.streams.githubevents.PullRequestMergedEventsStream; import org.apache.pinot.tools.utils.KafkaStarterUtils; +import org.apache.pinot.tools.utils.KinesisStarterUtils; +import org.apache.pinot.tools.utils.StreamSourceType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,15 +43,16 @@ import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; /** * Sets up a demo Pinot cluster with 1 zookeeper, 1 controller, 1 broker and 1 server - * Sets up a demo Kafka cluster, and creates a topic pullRequestMergedEvents + * Sets up a demo Kafka/Kinesis cluster, and creates a topic pullRequestMergedEvents * Creates a realtime table pullRequestMergedEvents * Starts the {@link PullRequestMergedEventsStream} to publish pullRequestMergedEvents into the topic */ public class GitHubEventsQuickstart extends QuickStartBase { private static final Logger LOGGER = LoggerFactory.getLogger(GitHubEventsQuickstart.class); - private StreamDataServerStartable _kafkaStarter; + private StreamDataServerStartable _serverStarter; private ZkStarter.ZookeeperInstance _zookeeperInstance; private String _personalAccessToken; + private StreamSourceType _sourceType; public GitHubEventsQuickstart() { } @@ -56,16 +60,46 @@ public class GitHubEventsQuickstart extends QuickStartBase { private void startKafka() { _zookeeperInstance = ZkStarter.startLocalZkServer(); try { - _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, + _serverStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance)); } catch (Exception e) { throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e); } - _kafkaStarter.start(); - _kafkaStarter.createTopic("pullRequestMergedEvents", KafkaStarterUtils.getTopicCreationProps(2)); + _serverStarter.start(); + _serverStarter.createTopic("pullRequestMergedEvents", KafkaStarterUtils.getTopicCreationProps(2)); } - private void execute(String personalAccessToken) + private void startKinesis() { + try { + + Properties serverProperties = new Properties(); + serverProperties.put(KinesisStarterUtils.PORT, 4566); + _serverStarter = + StreamDataProvider.getServerDataStartable(KinesisStarterUtils.KINESIS_SERVER_STARTABLE_CLASS_NAME, + serverProperties); + } catch (Exception e) { + throw new RuntimeException("Failed to start " + KinesisStarterUtils.KINESIS_SERVER_STARTABLE_CLASS_NAME, e); + } + _serverStarter.start(); + + Properties topicProperties = new Properties(); + topicProperties.put(KinesisStarterUtils.NUM_SHARDS, 3); + _serverStarter.createTopic("pullRequestMergedEvents", topicProperties); + } + + private void startStreamServer() { + switch (_sourceType) { + case KINESIS: + startKinesis(); + break; + case KAFKA: + default: + startKafka(); + break; + } + } + + private void execute(String personalAccessToken, StreamSourceType streamSourceType) throws Exception { final File quickStartDataDir = new File(new File("githubEvents-" + System.currentTimeMillis()), "pullRequestMergedEvents"); @@ -81,8 +115,8 @@ public class GitHubEventsQuickstart extends QuickStartBase { URL resource = classLoader.getResource("examples/stream/githubEvents/pullRequestMergedEvents_schema.json"); Preconditions.checkNotNull(resource); FileUtils.copyURLToFile(resource, schemaFile); - resource = - classLoader.getResource("examples/stream/githubEvents/pullRequestMergedEvents_realtime_table_config.json"); + String tableConfigFilePath = getTableConfigFilePath(); + resource = classLoader.getResource(tableConfigFilePath); Preconditions.checkNotNull(resource); FileUtils.copyURLToFile(resource, tableConfigFile); @@ -92,8 +126,8 @@ public class GitHubEventsQuickstart extends QuickStartBase { final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, tempDir, getConfigOverrides()); - printStatus(Color.CYAN, "***** Starting Kafka *****"); - startKafka(); + printStatus(Color.CYAN, String.format("***** Starting %s *****", streamSourceType)); + startStreamServer(); printStatus(Color.CYAN, "***** Starting zookeeper, controller, server and broker *****"); runner.startAll(); @@ -101,10 +135,11 @@ public class GitHubEventsQuickstart extends QuickStartBase { printStatus(Color.CYAN, "***** Adding pullRequestMergedEvents table *****"); runner.bootstrapTable(); - printStatus(Color.CYAN, "***** Starting pullRequestMergedEvents data stream and publishing to Kafka *****"); + printStatus(Color.CYAN, + String.format("***** Starting pullRequestMergedEvents data stream and publishing to %s *****", _sourceType)); final PullRequestMergedEventsStream pullRequestMergedEventsStream = - new PullRequestMergedEventsStream(schemaFile.getAbsolutePath(), "pullRequestMergedEvents", - personalAccessToken, PullRequestMergedEventsStream.getKafkaStreamDataProducer()); + new PullRequestMergedEventsStream(schemaFile.getAbsolutePath(), "pullRequestMergedEvents", personalAccessToken, + PullRequestMergedEventsStream.getStreamDataProducer(_sourceType)); pullRequestMergedEventsStream.execute(); printStatus(Color.CYAN, "***** Waiting for 10 seconds for a few events to get populated *****"); Thread.sleep(10000); @@ -113,7 +148,7 @@ public class GitHubEventsQuickstart extends QuickStartBase { try { printStatus(Color.GREEN, "***** Shutting down GitHubEventsQuickStart *****"); runner.stop(); - _kafkaStarter.stop(); + _serverStarter.stop(); ZkStarter.stopLocalZkServer(_zookeeperInstance); FileUtils.deleteDirectory(quickStartDataDir); } catch (Exception e) { @@ -156,6 +191,22 @@ public class GitHubEventsQuickstart extends QuickStartBase { printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console"); } + private String getTableConfigFilePath() { + String tableConfigFilePath; + switch (_sourceType) { + case KINESIS: + tableConfigFilePath = + "examples/stream/githubEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json"; + break; + case KAFKA: + default: + tableConfigFilePath = + "examples/stream/githubEvents/pullRequestMergedEvents_realtime_table_config.json"; + break; + } + return tableConfigFilePath; + } + @Override public List<String> types() { return Arrays.asList("GITHUB-EVENTS", "GITHUB_EVENTS"); @@ -164,11 +215,16 @@ public class GitHubEventsQuickstart extends QuickStartBase { @Override public void execute() throws Exception { - execute(_personalAccessToken); + execute(_personalAccessToken, _sourceType); } public GitHubEventsQuickstart setPersonalAccessToken(String personalAccessToken) { _personalAccessToken = personalAccessToken; return this; } + + public GitHubEventsQuickstart setSourceType(String sourceType) { + _sourceType = StreamSourceType.valueOf(sourceType.toUpperCase()); + return this; + } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GitHubEventsQuickStartCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GitHubEventsQuickStartCommand.java index 578f94d..e4cc73d 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GitHubEventsQuickStartCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GitHubEventsQuickStartCommand.java @@ -33,6 +33,10 @@ public class GitHubEventsQuickStartCommand extends AbstractBaseAdminCommand impl @CommandLine.Option(names = {"-personalAccessToken"}, required = true, description = "GitHub personal access token.") private String _personalAccessToken; + @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka", + description = "Stream DataSource to use for ingesting data. Supported values - Kafka,Kinesis") + private String _sourceType; + @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, help = true, description = "Print this message.") private boolean _help = false; @@ -40,6 +44,10 @@ public class GitHubEventsQuickStartCommand extends AbstractBaseAdminCommand impl _personalAccessToken = personalAccessToken; } + public void setSourceType(String sourceType) { + _sourceType = sourceType; + } + @Override public boolean getHelp() { return _help; @@ -52,7 +60,7 @@ public class GitHubEventsQuickStartCommand extends AbstractBaseAdminCommand impl @Override public String toString() { - return ("GitHubEventsQuickStart -personalAccessToken " + _personalAccessToken); + return ("GitHubEventsQuickStart -personalAccessToken " + _personalAccessToken + " -sourceType" + _sourceType); } @Override @@ -68,7 +76,7 @@ public class GitHubEventsQuickStartCommand extends AbstractBaseAdminCommand impl public boolean execute() throws Exception { PluginManager.get().init(); - new GitHubEventsQuickstart().setPersonalAccessToken(_personalAccessToken).execute(); + new GitHubEventsQuickstart().setPersonalAccessToken(_personalAccessToken).setSourceType(_sourceType).execute(); return true; } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java index 269281d..579dd63 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java @@ -19,14 +19,16 @@ package org.apache.pinot.tools.admin.command; import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.StreamDataProducer; import org.apache.pinot.tools.Command; import org.apache.pinot.tools.streams.githubevents.PullRequestMergedEventsStream; import org.apache.pinot.tools.utils.KafkaStarterUtils; +import org.apache.pinot.tools.utils.StreamSourceType; import picocli.CommandLine; /** - * Command to stream GitHub events into a kafka topic + * Command to stream GitHub events into a kafka topic or kinesis stream */ @CommandLine.Command(name = "StreamGitHubEvents") public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implements Command { @@ -36,11 +38,29 @@ public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implemen @CommandLine.Option(names = {"-personalAccessToken"}, required = true, description = "GitHub personal access token.") private String _personalAccessToken; + @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka", + description = "Stream DataSource to use for ingesting data. Supported values - Kafka,Kinesis") + private String _sourceType; + @CommandLine.Option(names = {"-kafkaBrokerList"}, description = "Kafka broker list of the kafka cluster to produce events.") private String _kafkaBrokerList = KafkaStarterUtils.DEFAULT_KAFKA_BROKER; - @CommandLine.Option(names = {"-topic"}, required = true, description = "Name of kafka topic to publish events.") + @CommandLine.Option(names = {"-kinesisEndpoint"}, + description = "Endpoint of localstack or any other Kinesis cluster when not using AWS.") + private String _kinesisEndpoint = null; + + @CommandLine.Option(names = {"-awsRegion"}, description = "AWS Region in which Kinesis is located") + private String _awsRegion = "us-east-1"; + + @CommandLine.Option(names = {"-awsAccessKey"}, description = "AccessKey for AWS Account.") + private String _accessKey; + + @CommandLine.Option(names = {"-awsSecretKey"}, description = "SecretKey for AWS Account") + private String _secretKey; + + @CommandLine.Option(names = {"-topic"}, required = true, + description = "Name of kafka-topic/kinesis-stream to publish events.") private String _topic; @CommandLine.Option(names = {"-eventType"}, @@ -96,7 +116,7 @@ public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implemen @Override public String description() { - return "Streams GitHubEvents into a Kafka topic"; + return "Streams GitHubEvents into a Kafka topic or Kinesis Stream"; } @Override @@ -104,9 +124,20 @@ public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implemen throws Exception { PluginManager.get().init(); if (PULL_REQUEST_MERGED_EVENT_TYPE.equals(_eventType)) { + StreamDataProducer streamDataProducer; + switch (StreamSourceType.valueOf(_sourceType.toUpperCase())) { + case KINESIS: + streamDataProducer = + PullRequestMergedEventsStream.getKinesisStreamDataProducer(_kinesisEndpoint, _awsRegion, _accessKey, + _secretKey); + break; + case KAFKA: + default: + streamDataProducer = PullRequestMergedEventsStream.getKafkaStreamDataProducer(_kafkaBrokerList); + break; + } PullRequestMergedEventsStream pullRequestMergedEventsStream = - new PullRequestMergedEventsStream(_schemaFile, _topic, _personalAccessToken, - PullRequestMergedEventsStream.getKafkaStreamDataProducer(_kafkaBrokerList)); + new PullRequestMergedEventsStream(_schemaFile, _topic, _personalAccessToken, streamDataProducer); pullRequestMergedEventsStream.execute(); } else { throw new UnsupportedOperationException("Event type " + _eventType + " is unsupported"); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java index 465f2fd..ab45f45 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java @@ -30,12 +30,15 @@ import java.util.concurrent.Executors; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.plugin.inputformat.avro.AvroUtils; import org.apache.pinot.spi.stream.StreamDataProducer; import org.apache.pinot.spi.stream.StreamDataProvider; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.tools.Quickstart; import org.apache.pinot.tools.utils.KafkaStarterUtils; +import org.apache.pinot.tools.utils.KinesisStarterUtils; +import org.apache.pinot.tools.utils.StreamSourceType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,6 +102,40 @@ public class PullRequestMergedEventsStream { return StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); } + public static StreamDataProducer getKinesisStreamDataProducer(String endpoint, String region, String access, + String secret) + throws Exception { + Properties properties = new Properties(); + + if (StringUtils.isNotEmpty(access) && StringUtils.isNotEmpty(secret)) { + properties.put("access", access); + properties.put("secret", secret); + } + + if (StringUtils.isNotEmpty(endpoint)) { + properties.put("endpoint", endpoint); + } + properties.put("region", region); + return StreamDataProvider.getStreamDataProducer(KinesisStarterUtils.KINESIS_PRODUCER_CLASS_NAME, properties); + } + + public static StreamDataProducer getKinesisStreamDataProducer() + throws Exception { + return getKinesisStreamDataProducer("http://localhost:4566", "us-east-1", "access", "secret"); + } + + public static StreamDataProducer getStreamDataProducer(StreamSourceType streamSourceType) + throws Exception { + switch (streamSourceType) { + case KAFKA: + return getKafkaStreamDataProducer(); + case KINESIS: + return getKinesisStreamDataProducer(); + default: + throw new RuntimeException("Invalid streamSourceType specified: " + streamSourceType); + } + } + public static void main(String[] args) throws Exception { String personalAccessToken = args[0]; diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KinesisStarterUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KinesisStarterUtils.java new file mode 100644 index 0000000..3f6160a --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KinesisStarterUtils.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools.utils; + +import java.util.Properties; +import org.apache.pinot.spi.stream.StreamDataProvider; +import org.apache.pinot.spi.stream.StreamDataServerStartable; + + +public class KinesisStarterUtils { + private KinesisStarterUtils() { + } + + public static final String DEFAULT_KINESIS_PORT = "4566"; + public static final String DEFAULT_KINESIS_ENDPOINT = "http://localhost:" + DEFAULT_KINESIS_PORT; + + public static final String KINESIS_SERVER_STARTABLE_CLASS_NAME = + getKinesisConnectorPackageName() + ".server.KinesisDataServerStartable"; + public static final String KINESIS_PRODUCER_CLASS_NAME = + getKinesisConnectorPackageName() + ".server.KinesisDataProducer"; + public static final String KINESIS_STREAM_CONSUMER_FACTORY_CLASS_NAME = + getKinesisConnectorPackageName() + ".KinesisConsumerFactory"; + + public static final String PORT = "port"; + public static final String NUM_SHARDS = "numShards"; + + private static String getKinesisConnectorPackageName() { + return "org.apache.pinot.plugin.stream.kinesis"; + } + + public static Properties getTopicCreationProps(int numKinesisShards) { + Properties topicProps = new Properties(); + topicProps.put(NUM_SHARDS, numKinesisShards); + return topicProps; + } + + public static StreamDataServerStartable startServer(final int port, final Properties baseConf) { + StreamDataServerStartable kinesisStarter; + Properties configuration = new Properties(baseConf); + int kinesisPort = port; + try { + configuration.put(KinesisStarterUtils.PORT, kinesisPort); + kinesisStarter = StreamDataProvider.getServerDataStartable(KINESIS_SERVER_STARTABLE_CLASS_NAME, configuration); + } catch (Exception e) { + throw new RuntimeException("Failed to start " + KINESIS_SERVER_STARTABLE_CLASS_NAME, e); + } + kinesisStarter.start(); + return kinesisStarter; + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/StreamSourceType.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/StreamSourceType.java new file mode 100644 index 0000000..8464512 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/StreamSourceType.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools.utils; + +public enum StreamSourceType { + KAFKA, + KINESIS; +} diff --git a/pinot-tools/src/main/resources/examples/stream/githubEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/githubEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json new file mode 100644 index 0000000..7cdd27b --- /dev/null +++ b/pinot-tools/src/main/resources/examples/stream/githubEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json @@ -0,0 +1,39 @@ +{ + "tableName": "pullRequestMergedEvents", + "tableType": "REALTIME", + "segmentsConfig": { + "timeColumnName": "mergedTimeMillis", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "60", + "schemaName": "pullRequestMergedEvents", + "replication": "1", + "replicasPerPartition": "1" + }, + "tenants": {}, + "tableIndexConfig": { + "loadMode": "MMAP", + "invertedIndexColumns": [ + "organization", + "repo" + ], + "streamConfigs": { + "streamType": "kinesis", + "stream.kinesis.consumer.type": "lowlevel", + "stream.kinesis.topic.name": "pullRequestMergedEvents", + "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder", + "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory", + "realtime.segment.flush.threshold.time": "12h", + "realtime.segment.flush.threshold.size": "100000", + "stream.kinesis.consumer.prop.auto.offset.reset": "smallest", + "region": "us-east-1", + "shardIteratorType": "TRIM_HORIZON", + "endpoint" : "http://localhost:4566", + "accessKey" : "access", + "secretKey": "secret" + } + }, + "metadata": { + "customConfigs": {} + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org