This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 63c1e504db Add SASL_SSL support for stream github event command (#10253) 63c1e504db is described below commit 63c1e504db3b241314899a12d251b39d2604bdfe Author: Seunghyun Lee <seungh...@startree.ai> AuthorDate: Thu Feb 9 01:56:24 2023 -0800 Add SASL_SSL support for stream github event command (#10253) - Adding the sasl_ssl support for stream github event command - This allows to configure to publish events to the kafka topics with sasl_ssl/plaintext. --- .../admin/command/StreamGitHubEventsCommand.java | 35 +++++++++++++++------- .../PullRequestMergedEventsStream.java | 18 +++++++++-- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java index 579dd63089..7b0fea34d4 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java @@ -38,16 +38,27 @@ public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implemen @CommandLine.Option(names = {"-personalAccessToken"}, required = true, description = "GitHub personal access token.") private String _personalAccessToken; - @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka", - description = "Stream DataSource to use for ingesting data. Supported values - Kafka,Kinesis") + @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka", description = "Stream DataSource to use for " + + "ingesting data. Supported values - Kafka,Kinesis") private String _sourceType; - @CommandLine.Option(names = {"-kafkaBrokerList"}, - description = "Kafka broker list of the kafka cluster to produce events.") + @CommandLine.Option(names = {"-kafkaBrokerList"}, description = "Kafka broker list of the kafka cluster to produce " + + "events.") private String _kafkaBrokerList = KafkaStarterUtils.DEFAULT_KAFKA_BROKER; - @CommandLine.Option(names = {"-kinesisEndpoint"}, - description = "Endpoint of localstack or any other Kinesis cluster when not using AWS.") + @CommandLine.Option(names = {"-kafkaSecurityProtocol"}, description = "Kafka security protocol " + + "(PLAINTEXT/SASL_SSL/etc") + private String _kafkaSecurityProtocol; + + // Only needed when configured as SASL_SSL + @CommandLine.Option(names = {"-kafkaUserName"}, description = "Kafka Username") + private String _kafkaSaslUserName; + + @CommandLine.Option(names = {"-kafkaPassword"}, description = "Kafka Password") + private String _kafkaSaslPassword; + + @CommandLine.Option(names = {"-kinesisEndpoint"}, description = "Endpoint of localstack or any other Kinesis " + + "cluster when not using AWS.") private String _kinesisEndpoint = null; @CommandLine.Option(names = {"-awsRegion"}, description = "AWS Region in which Kinesis is located") @@ -59,12 +70,12 @@ public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implemen @CommandLine.Option(names = {"-awsSecretKey"}, description = "SecretKey for AWS Account") private String _secretKey; - @CommandLine.Option(names = {"-topic"}, required = true, - description = "Name of kafka-topic/kinesis-stream to publish events.") + @CommandLine.Option(names = {"-topic"}, required = true, description = "Name of kafka-topic/kinesis-stream to " + + "publish events.") private String _topic; - @CommandLine.Option(names = {"-eventType"}, - description = "Type of GitHub event. Supported types - pullRequestMergedEvent") + @CommandLine.Option(names = {"-eventType"}, description = "Type of GitHub event. Supported types - " + + "pullRequestMergedEvent") private String _eventType = PULL_REQUEST_MERGED_EVENT_TYPE; @CommandLine.Option(names = {"-schemaFile"}, description = "Path to schema file. " @@ -133,7 +144,9 @@ public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implemen break; case KAFKA: default: - streamDataProducer = PullRequestMergedEventsStream.getKafkaStreamDataProducer(_kafkaBrokerList); + streamDataProducer = + PullRequestMergedEventsStream.getKafkaStreamDataProducer(_kafkaBrokerList, _kafkaSecurityProtocol, + _kafkaSaslUserName, _kafkaSaslPassword); break; } PullRequestMergedEventsStream pullRequestMergedEventsStream = diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java index 950af0d260..34cc6222a6 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java @@ -82,15 +82,29 @@ public class PullRequestMergedEventsStream { public static StreamDataProducer getKafkaStreamDataProducer() throws Exception { - return getKafkaStreamDataProducer(KafkaStarterUtils.DEFAULT_KAFKA_BROKER); + return getKafkaStreamDataProducer(KafkaStarterUtils.DEFAULT_KAFKA_BROKER, null, null, null); } - public static StreamDataProducer getKafkaStreamDataProducer(String kafkaBrokerList) + public static StreamDataProducer getKafkaStreamDataProducer(String kafkaBrokerList, String kafkaSecurityProtocol, + String kafkaSaslUserName, String kafkaSaslPassword) throws Exception { Properties properties = new Properties(); properties.put("metadata.broker.list", kafkaBrokerList); properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); properties.put("request.required.acks", "1"); + + if (StringUtils.isNotEmpty(kafkaSecurityProtocol)) { + properties.put("security.protocol", kafkaSecurityProtocol); + // If the protocol is 'SASL_SSL', fill the sasl related configs + if (kafkaSecurityProtocol.equals("SASL_SSL") && StringUtils.isNotEmpty(kafkaSaslUserName) + && StringUtils.isNotEmpty(kafkaSaslPassword)) { + properties.put("sasl.mechanism", "PLAIN"); + String jaasConfig = String.format( + "org.apache.kafka.common.security.plain.PlainLoginModule required \n username=\"%s\" \n password=\"%s\";", + kafkaSaslUserName, kafkaSaslPassword); + properties.put("sasl.jaas.config", jaasConfig); + } + } return StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org