npawar commented on a change in pull request #8333: URL: https://github.com/apache/pinot/pull/8333#discussion_r824996660
########## File path: pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java ########## @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kinesis.server; + +import java.net.URI; +import java.util.Properties; +import java.util.UUID; +import org.apache.pinot.spi.stream.StreamDataProducer; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.apache.ApacheSdkHttpService; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; +import software.amazon.awssdk.utils.AttributeMap; + + +public class KinesisDataProducer implements StreamDataProducer { + public static final String ENDPOINT = "endpoint"; + public static final String REGION = "region"; + public static final String ACCESS = "access"; + public static final String SECRET = "secret"; + public static final String DEFAULT_PORT = "4566"; + public static final String DEFAULT_ENDPOINT = "http://localhost:4566"; + + private KinesisClient _kinesisClient; + + @Override + public void init(Properties props) { + try { + if (props.containsKey(ENDPOINT)) { Review comment: From the other places where I've encountered construction of amazon clients (one example is S3PinotFS, KinesisConnectionHandler), the endpoint should apply regardless of credentials provider. So, ``` if (accessKey and secret key is provided) { credentialsProvider = Basic/Static CredentialsProvider } else { credentialsProvider = DefaultCredentialsProvider } client = build using region and credentialsProvider if (endpoint) { add endpoint override to client } ``` Is it different here on purpose? ########## File path: pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java ########## @@ -19,9 +19,11 @@ package org.apache.pinot.tools.admin.command; import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.StreamDataProducer; import org.apache.pinot.tools.Command; import org.apache.pinot.tools.streams.githubevents.PullRequestMergedEventsStream; import org.apache.pinot.tools.utils.KafkaStarterUtils; +import org.apache.pinot.tools.utils.StreamSourceType; import picocli.CommandLine; Review comment: nit: javadoc also needs to be updated to say kafka/kinesis ########## File path: pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataServerStartable.java ########## @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kinesis.server; + +import cloud.localstack.Localstack; +import cloud.localstack.ServiceName; +import cloud.localstack.docker.annotation.LocalstackDockerConfiguration; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import org.apache.pinot.spi.stream.StreamDataServerStartable; +import org.apache.pinot.spi.utils.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.apache.ApacheSdkHttpService; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.utils.AttributeMap; + + +public class KinesisDataServerStartable implements StreamDataServerStartable { + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisDataServerStartable.class); + + public static final String NUM_SHARDS_PROPERTY = "numShards"; + public static final String DEFAULT_REGION = "us-east-1"; + public static final String DEFAULT_ACCESS_KEY = "access"; + public static final String DEFAULT_SECRET_KEY = "secret"; + public static final String DEFAULT_PORT = "4566"; + + private final Localstack _localstackDocker = Localstack.INSTANCE; + LocalstackDockerConfiguration _dockerConfig; + Properties _serverProperties; + private String _localStackKinesisEndpoint = "http://localhost:%s"; + + @Override + public void init(Properties props) { + _serverProperties = props; + final Map<String, String> environmentVariables = new HashMap<>(); + environmentVariables.put("SERVICES", ServiceName.KINESIS); + _dockerConfig = + LocalstackDockerConfiguration.builder().portEdge(_serverProperties.getProperty("port", DEFAULT_PORT)) + .portElasticSearch(String.valueOf(NetUtils.findOpenPort(4571))).imageTag("0.12.15") + .environmentVariables(environmentVariables).build(); + + _localStackKinesisEndpoint = + String.format(_localStackKinesisEndpoint, _serverProperties.getProperty("port", DEFAULT_PORT)); + } + + @Override + public void start() { + _localstackDocker.startup(_dockerConfig); + } + + @Override + public void stop() { + _localstackDocker.stop(); + } + + @Override + public void createTopic(String topic, Properties topicProps) { + try { + KinesisClient kinesisClient = KinesisClient.builder().httpClient( + new ApacheSdkHttpService().createHttpClientBuilder().buildWithDefaults( + AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.TRUE).build())) + .credentialsProvider(getLocalAWSCredentials()).region(Region.of(DEFAULT_REGION)) + .endpointOverride(new URI(_localStackKinesisEndpoint)).build(); + + kinesisClient.createStream( + CreateStreamRequest.builder().streamName(topic).shardCount((Integer) topicProps.get(NUM_SHARDS_PROPERTY)) + .build()); + + String kinesisStreamStatus = + kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(topic).build()).streamDescription() + .streamStatusAsString(); + + while (!kinesisStreamStatus.contentEquals("ACTIVE")) { Review comment: wrap this in a waitForCondition loop (like in TestUtils), so we're not stuck here forever in case of some issue ########## File path: pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java ########## @@ -40,32 +43,63 @@ /** * Sets up a demo Pinot cluster with 1 zookeeper, 1 controller, 1 broker and 1 server - * Sets up a demo Kafka cluster, and creates a topic pullRequestMergedEvents + * Sets up a demo Kafka/Kinesis cluster, and creates a topic pullRequestMergedEvents * Creates a realtime table pullRequestMergedEvents * Starts the {@link PullRequestMergedEventsStream} to publish pullRequestMergedEvents into the topic */ public class GitHubEventsQuickstart extends QuickStartBase { private static final Logger LOGGER = LoggerFactory.getLogger(GitHubEventsQuickstart.class); - private StreamDataServerStartable _kafkaStarter; + private StreamDataServerStartable _serverStarter; private ZkStarter.ZookeeperInstance _zookeeperInstance; private String _personalAccessToken; + private StreamSourceType _sourceType; public GitHubEventsQuickstart() { } private void startKafka() { _zookeeperInstance = ZkStarter.startLocalZkServer(); try { - _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, + _serverStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance)); } catch (Exception e) { throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e); } - _kafkaStarter.start(); - _kafkaStarter.createTopic("pullRequestMergedEvents", KafkaStarterUtils.getTopicCreationProps(2)); + _serverStarter.start(); + _serverStarter.createTopic("pullRequestMergedEvents", KafkaStarterUtils.getTopicCreationProps(2)); } - private void execute(String personalAccessToken) + private void startKinesis() { + try { + + Properties serverProperties = new Properties(); + serverProperties.put(KinesisStarterUtils.PORT, 4566); + _serverStarter = + StreamDataProvider.getServerDataStartable(KinesisStarterUtils.KINESIS_SERVER_STARTABLE_CLASS_NAME, + serverProperties); + } catch (Exception e) { + throw new RuntimeException("Failed to start " + KinesisStarterUtils.KINESIS_SERVER_STARTABLE_CLASS_NAME, e); + } + _serverStarter.start(); + + Properties topicProperties = new Properties(); + topicProperties.put(KinesisStarterUtils.NUM_SHARDS, 3); + _serverStarter.createTopic("pullRequestMergedEvents", topicProperties); + } + + private void startServer() { Review comment: nit: s/startServer/startStreamServer so we don't confuse it with pinot server which is commonly started using startServer method ########## File path: pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataServerStartable.java ########## @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kinesis.server; + +import cloud.localstack.Localstack; +import cloud.localstack.ServiceName; +import cloud.localstack.docker.annotation.LocalstackDockerConfiguration; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import org.apache.pinot.spi.stream.StreamDataServerStartable; +import org.apache.pinot.spi.utils.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.apache.ApacheSdkHttpService; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.utils.AttributeMap; + + +public class KinesisDataServerStartable implements StreamDataServerStartable { + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisDataServerStartable.class); + + public static final String NUM_SHARDS_PROPERTY = "numShards"; + public static final String DEFAULT_REGION = "us-east-1"; + public static final String DEFAULT_ACCESS_KEY = "access"; + public static final String DEFAULT_SECRET_KEY = "secret"; + public static final String DEFAULT_PORT = "4566"; + + private final Localstack _localstackDocker = Localstack.INSTANCE; + LocalstackDockerConfiguration _dockerConfig; + Properties _serverProperties; + private String _localStackKinesisEndpoint = "http://localhost:%s"; + + @Override + public void init(Properties props) { + _serverProperties = props; + final Map<String, String> environmentVariables = new HashMap<>(); + environmentVariables.put("SERVICES", ServiceName.KINESIS); + _dockerConfig = + LocalstackDockerConfiguration.builder().portEdge(_serverProperties.getProperty("port", DEFAULT_PORT)) + .portElasticSearch(String.valueOf(NetUtils.findOpenPort(4571))).imageTag("0.12.15") + .environmentVariables(environmentVariables).build(); + + _localStackKinesisEndpoint = + String.format(_localStackKinesisEndpoint, _serverProperties.getProperty("port", DEFAULT_PORT)); + } + + @Override + public void start() { + _localstackDocker.startup(_dockerConfig); + } + + @Override + public void stop() { + _localstackDocker.stop(); + } + + @Override + public void createTopic(String topic, Properties topicProps) { + try { + KinesisClient kinesisClient = KinesisClient.builder().httpClient( Review comment: can this code to create the clients (from here, from KinesisDataProducer and from KinesisConnectionHandler) be moved into a common ClientFactory under a utils package in this module? It'll help us be consistent, and also easily propagate changes ########## File path: pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java ########## @@ -36,10 +38,21 @@ @CommandLine.Option(names = {"-personalAccessToken"}, required = true, description = "GitHub personal access token.") private String _personalAccessToken; + @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka", + description = "Stream DataSource to use for ingesting data. Supported values - Kafka,Kinesis") + private String _sourceType; + @CommandLine.Option(names = {"-kafkaBrokerList"}, description = "Kafka broker list of the kafka cluster to produce events.") private String _kafkaBrokerList = KafkaStarterUtils.DEFAULT_KAFKA_BROKER; + @CommandLine.Option(names = {"-kinesisEndpoint"}, Review comment: optional: provide optional accessKey and secretKey too? one may want to test with a real kinesis and "access" and "secret" may not work rt? I see this command being very useful, and can see folks wanting to use this for their own setup tests The `getKinesisStreamDataProducer` would also need to be updated accordingly. ########## File path: pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java ########## @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kinesis.server; + +import java.net.URI; +import java.util.Properties; +import java.util.UUID; +import org.apache.pinot.spi.stream.StreamDataProducer; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.apache.ApacheSdkHttpService; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; +import software.amazon.awssdk.utils.AttributeMap; + + +public class KinesisDataProducer implements StreamDataProducer { + public static final String ENDPOINT = "endpoint"; + public static final String REGION = "region"; + public static final String ACCESS = "access"; + public static final String SECRET = "secret"; + public static final String DEFAULT_PORT = "4566"; + public static final String DEFAULT_ENDPOINT = "http://localhost:4566"; + + private KinesisClient _kinesisClient; + + @Override + public void init(Properties props) { + try { + if (props.containsKey(ENDPOINT)) { + String localStackKinesisEndpoint; + localStackKinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT); Review comment: rename this to just kinesisEndpoint? This should work for any forms of kinesis setup using endpoint right? ########## File path: pinot-tools/src/main/java/org/apache/pinot/tools/utils/KinesisStarterUtils.java ########## @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools.utils; + +import java.util.Properties; +import org.apache.pinot.spi.stream.StreamDataProvider; +import org.apache.pinot.spi.stream.StreamDataServerStartable; + + +public class KinesisStarterUtils { + private KinesisStarterUtils() { + } + + public static final String DEFAULT_KINESIS_PORT = "4566"; + public static final String DEFAULT_KINESIS_ENDPOINT = "http://localhost:" + DEFAULT_KINESIS_PORT; + + public static final String KINESIS_SERVER_STARTABLE_CLASS_NAME = + getKinesisConnectorPackageName() + ".server.KinesisDataServerStartable"; + public static final String KINESIS_PRODUCER_CLASS_NAME = + getKinesisConnectorPackageName() + ".server.KinesisDataProducer"; + public static final String KINESIS_STREAM_CONSUMER_FACTORY_CLASS_NAME = + getKinesisConnectorPackageName() + ".KinesisConsumerFactory"; + + public static final String PORT = "port"; + public static final String NUM_SHARDS = "numShards"; + + private static String getKinesisConnectorPackageName() { + return "org.apache.pinot.plugin.stream.kinesis"; + } + + public static Properties getTopicCreationProps(int numKinesisShards) { Review comment: are the methods in KinesisStarterUtils being used anywhere? I only found usages of the static constants in this file ########## File path: pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java ########## @@ -81,8 +115,9 @@ private void execute(String personalAccessToken) URL resource = classLoader.getResource("examples/stream/githubEvents/pullRequestMergedEvents_schema.json"); Preconditions.checkNotNull(resource); FileUtils.copyURLToFile(resource, schemaFile); - resource = - classLoader.getResource("examples/stream/githubEvents/pullRequestMergedEvents_realtime_table_config.json"); + String tableConfigFilePath; Review comment: nit: merge 118 and 119 into 1 line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org