This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 348353be88 Handle authentication in pulsar pinot connector (#8338) 348353be88 is described below commit 348353be88b6f1864bff5d474ecc9f9c254a9eda Author: mathieudruart <mathieudru...@users.noreply.github.com> AuthorDate: Tue Apr 12 13:07:52 2022 -0400 Handle authentication in pulsar pinot connector (#8338) --- .../pinot/plugin/stream/pulsar/PulsarConfig.java | 19 +++++++++++++++++++ .../pulsar/PulsarPartitionLevelConnectionHandler.java | 15 ++++++++++++++- .../pulsar/PulsarStreamLevelConsumerManager.java | 17 ++++++++++++++++- 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java index 2ce2c7551a..78db1b766d 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java @@ -34,12 +34,16 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; public class PulsarConfig { public static final String STREAM_TYPE = "pulsar"; public static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; + public static final String AUTHENTICATION_TOKEN = "authenticationToken"; + public static final String TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath"; private String _pulsarTopicName; private String _subscriberId; private String _bootstrapServers; private MessageId _initialMessageId; private SubscriptionInitialPosition _subscriptionInitialPosition; + private String _authenticationToken; + private String _tlsTrustCertsFilePath; public PulsarConfig(StreamConfig streamConfig, String subscriberId) { Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap(); @@ -48,6 +52,13 @@ public class PulsarConfig { streamConfigMap.get(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, BOOTSTRAP_SERVERS)); _subscriberId = subscriberId; + String authenticationTokenKey = StreamConfigProperties.constructStreamProperty(STREAM_TYPE, AUTHENTICATION_TOKEN); + _authenticationToken = streamConfigMap.get(authenticationTokenKey); + + String tlsTrustCertsFilePathKey = StreamConfigProperties. + constructStreamProperty(STREAM_TYPE, TLS_TRUST_CERTS_FILE_PATH); + _tlsTrustCertsFilePath = streamConfigMap.get(tlsTrustCertsFilePathKey); + Preconditions.checkNotNull(_bootstrapServers, "No brokers provided in the config"); OffsetCriteria offsetCriteria = streamConfig.getOffsetCriteria(); @@ -75,4 +86,12 @@ public class PulsarConfig { public SubscriptionInitialPosition getInitialSubscriberPosition() { return _subscriptionInitialPosition; } + + public String getAuthenticationToken() { + return _authenticationToken; + } + + public String getTlsTrustCertsFilePath() { + return _tlsTrustCertsFilePath; + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java index 47dbd97ed4..2aaf9d5694 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java @@ -21,6 +21,9 @@ package org.apache.pinot.plugin.stream.pulsar; import java.io.IOException; import java.util.List; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; import org.slf4j.Logger; @@ -50,7 +53,17 @@ public class PulsarPartitionLevelConnectionHandler { _topic = _config.getPulsarTopicName(); try { - _pulsarClient = PulsarClient.builder().serviceUrl(_config.getBootstrapServers()).build(); + ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl(_config.getBootstrapServers()); + if (_config.getTlsTrustCertsFilePath() != null) { + pulsarClientBuilder.tlsTrustCertsFilePath(_config.getTlsTrustCertsFilePath()); + } + + if (_config.getAuthenticationToken() != null) { + Authentication authentication = AuthenticationFactory.token(_config.getAuthenticationToken()); + pulsarClientBuilder.authentication(authentication); + } + + _pulsarClient = pulsarClientBuilder.build(); _reader = _pulsarClient.newReader().topic(getPartitionedTopicName(partition)) .startMessageId(_config.getInitialMessageId()).startMessageIdInclusive().create(); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java index 325ebccef4..ff23a0cea6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java @@ -25,6 +25,9 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; @@ -75,7 +78,19 @@ public class PulsarStreamLevelConsumerManager { // Create the consumer try { - _pulsarClient = PulsarClient.builder().serviceUrl(pulsarStreamLevelStreamConfig.getBootstrapServers()).build(); + ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl( + pulsarStreamLevelStreamConfig.getBootstrapServers()); + if (pulsarStreamLevelStreamConfig.getTlsTrustCertsFilePath() != null) { + pulsarClientBuilder.tlsTrustCertsFilePath(pulsarStreamLevelStreamConfig.getTlsTrustCertsFilePath()); + } + + if (pulsarStreamLevelStreamConfig.getAuthenticationToken() != null) { + Authentication authentication = AuthenticationFactory.token( + pulsarStreamLevelStreamConfig.getAuthenticationToken()); + pulsarClientBuilder.authentication(authentication); + } + + _pulsarClient = pulsarClientBuilder.build(); _reader = _pulsarClient.newReader().topic(pulsarStreamLevelStreamConfig.getPulsarTopicName()) .startMessageId(pulsarStreamLevelStreamConfig.getInitialMessageId()).create(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org