This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8ef4dba Add tlsPort into InstanceConfig so Pinot can start both HTTP and HTTPS ports (#8313) 8ef4dba is described below commit 8ef4dbacfb42596590d08c7c6f14f88093dfe7d5 Author: Xiaoman Dong <xiao...@startree.ai> AuthorDate: Thu Mar 10 10:18:07 2022 -0800 Add tlsPort into InstanceConfig so Pinot can start both HTTP and HTTPS ports (#8313) * save temp work * update controller tlsport too * fix checkstyle * fix broken test --- .../broker/broker/helix/BaseBrokerStarter.java | 5 + .../apache/pinot/client/DynamicBrokerSelector.java | 16 ++- .../apache/pinot/client/ExternalViewReader.java | 64 ++++++++++-- .../pinot/client/DynamicBrokerSelectorTest.java | 5 + .../pinot/client/ExternalViewReaderTest.java | 109 +++++++++++++++++++++ .../pinot/common/helix/ExtraInstanceConfig.java | 47 +++++++++ .../pinot/common/utils/helix/HelixHelper.java | 13 +++ .../pinot/controller/BaseControllerStarter.java | 5 + .../controller/util/ListenerConfigUtilTest.java | 26 +++++ .../apache/pinot/core/util/ListenerConfigUtil.java | 16 +++ .../integration/tests/TlsIntegrationTest.java | 15 +++ 11 files changed, 309 insertions(+), 12 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index 3db3f88..ca59fcd 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -85,6 +85,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable { protected String _zkServers; protected String _hostname; protected int _port; + protected int _tlsPort; protected String _instanceId; private volatile boolean _isStarting = false; private volatile boolean _isShuttingDown = false; @@ -125,6 +126,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable { : NetUtils.getHostAddress(); } _port = _listenerConfigs.get(0).getPort(); + _tlsPort = ListenerConfigUtil.findLastTlsPort(_listenerConfigs, -1); _instanceId = _brokerConf.getProperty(Helix.Instance.INSTANCE_ID_KEY); if (_instanceId != null) { @@ -328,6 +330,9 @@ public abstract class BaseBrokerStarter implements ServiceStartable { private void updateInstanceConfigAndBrokerResourceIfNeeded() { InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_participantHelixManager, _instanceId); boolean instanceConfigUpdated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port); + if (_tlsPort > 0) { + HelixHelper.updateTlsPort(instanceConfig, _tlsPort); + } boolean shouldUpdateBrokerResource = false; String brokerTag = null; List<String> instanceTags = instanceConfig.getTags(); diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java index ddd10b4..44f97ac 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java @@ -45,16 +45,19 @@ public class DynamicBrokerSelector implements BrokerSelector, IZkDataListener { private final ZkClient _zkClient; private final ExternalViewReader _evReader; private final List<String> _brokerList; - - public DynamicBrokerSelector(String zkServers) { + //The preferTlsPort will be mapped to client config in the future, when we support full TLS + public DynamicBrokerSelector(String zkServers, boolean preferTlsPort) { _zkClient = getZkClient(zkServers); _zkClient.setZkSerializer(new BytesPushThroughSerializer()); _zkClient.waitUntilConnected(60, TimeUnit.SECONDS); _zkClient.subscribeDataChanges(ExternalViewReader.BROKER_EXTERNAL_VIEW_PATH, this); - _evReader = getEvReader(_zkClient); + _evReader = getEvReader(_zkClient, preferTlsPort); _brokerList = ImmutableList.of(zkServers); refresh(); } + public DynamicBrokerSelector(String zkServers) { + this(zkServers, false); + } @VisibleForTesting protected ZkClient getZkClient(String zkServers) { @@ -63,7 +66,12 @@ public class DynamicBrokerSelector implements BrokerSelector, IZkDataListener { @VisibleForTesting protected ExternalViewReader getEvReader(ZkClient zkClient) { - return new ExternalViewReader(zkClient); + return getEvReader(zkClient, false); + } + + @VisibleForTesting + protected ExternalViewReader getEvReader(ZkClient zkClient, boolean preferTlsPort) { + return new ExternalViewReader(zkClient, preferTlsPort); } private void refresh() { diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExternalViewReader.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExternalViewReader.java index 1a74285..32bd125 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExternalViewReader.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExternalViewReader.java @@ -18,12 +18,16 @@ */ package org.apache.pinot.client; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -45,14 +49,25 @@ public class ExternalViewReader { private static final Logger LOGGER = LoggerFactory.getLogger(ExternalViewReader.class); private static final ObjectReader OBJECT_READER = new ObjectMapper().reader(); public static final String BROKER_EXTERNAL_VIEW_PATH = "/EXTERNALVIEW/brokerResource"; + public static final String BROKER_INSTANCE_PATH = "/CONFIGS/PARTICIPANT"; public static final String REALTIME_SUFFIX = "_REALTIME"; public static final String OFFLINE_SUFFIX = "_OFFLINE"; + public static final String KEY_PINOT_TLS_PORT = "PINOT_TLS_PORT"; + public static final String KEY_SIMPLE_FIELDS = "simpleFields"; + public static final String KEY_HELIX_HOST = "HELIX_HOST"; + public static final String KEY_HELIX_PORT = "HELIX_PORT"; private ZkClient _zkClient; - public ExternalViewReader(ZkClient zkClient) { + @VisibleForTesting + boolean _preferTlsPort; + public ExternalViewReader(ZkClient zkClient, boolean preferTlsPort) { + _preferTlsPort = preferTlsPort; _zkClient = zkClient; } + public ExternalViewReader(ZkClient zkClient) { + this(zkClient, false); + } public List<String> getLiveBrokers() { List<String> brokerUrls = new ArrayList<>(); @@ -70,9 +85,7 @@ public class ExternalViewReader { Entry<String, JsonNode> brokerEntry = brokerEntries.next(); String brokerName = brokerEntry.getKey(); if (brokerName.startsWith("Broker_") && "ONLINE".equals(brokerEntry.getValue().asText())) { - // Turn Broker_12.34.56.78_1234 into 12.34.56.78:1234 - String brokerHostPort = brokerName.replace("Broker_", "").replace("_", ":"); - brokerUrls.add(brokerHostPort); + brokerUrls.add(getHostPort(brokerName)); } } } @@ -83,6 +96,43 @@ public class ExternalViewReader { return brokerUrls; } + @VisibleForTesting + String getHostPort(String brokerName) { + // Turn Broker_12.34.56.78_1234 into 12.34.56.78:1234, try InstanceConfig first, naming convention as backup + try { + byte[] znStrBytes = _zkClient.readData(BROKER_INSTANCE_PATH + "/" + brokerName, true); + if (znStrBytes != null) { + JsonNode record = OBJECT_READER.readTree(new String(znStrBytes, StandardCharsets.UTF_8)); + if (record != null) { + JsonNode simpleFields = record.get(KEY_SIMPLE_FIELDS); + if (simpleFields != null) { + JsonNode hostNameNode = simpleFields.get(KEY_HELIX_HOST); + JsonNode tlsPortNode = simpleFields.get(KEY_PINOT_TLS_PORT); + JsonNode helixPortNode = simpleFields.get(KEY_HELIX_PORT); + String[] splitItems = brokerName.split("_"); + if (splitItems.length < 3) { + throw new RuntimeException("Wrong BrokerName format " + brokerName); + } + String hostName = splitItems[1]; + if (hostNameNode != null && !Strings.isNullOrEmpty(hostNameNode.asText())) { + hostName = hostNameNode.asText(); + } + if (tlsPortNode != null && !Strings.isNullOrEmpty(tlsPortNode.asText()) && _preferTlsPort) { + return hostName + ":" + tlsPortNode.asText(); + } + if (helixPortNode != null && !Strings.isNullOrEmpty(helixPortNode.asText())) { + return hostName + ":" + helixPortNode.asText(); + } + return hostName + ":" + splitItems[splitItems.length - 1]; + } + } + } + } catch (JsonProcessingException ex) { + LOGGER.error("Failed to read broker instance config for {}. Return by naming convention", brokerName, ex); + } + return brokerName.replace("Broker_", "").replace("_", ":"); + } + protected ByteArrayInputStream getInputStream(byte[] brokerResourceNodeData) { return new ByteArrayInputStream(brokerResourceNodeData); } @@ -90,7 +140,7 @@ public class ExternalViewReader { public Map<String, List<String>> getTableToBrokersMap() { Map<String, Set<String>> brokerUrlsMap = new HashMap<>(); try { - byte[] brokerResourceNodeData = _zkClient.readData("/EXTERNALVIEW/brokerResource", true); + byte[] brokerResourceNodeData = _zkClient.readData(BROKER_EXTERNAL_VIEW_PATH, true); brokerResourceNodeData = unpackZnodeIfNecessary(brokerResourceNodeData); JsonNode jsonObject = OBJECT_READER.readTree(getInputStream(brokerResourceNodeData)); JsonNode brokerResourceNode = jsonObject.get("mapFields"); @@ -107,9 +157,7 @@ public class ExternalViewReader { Entry<String, JsonNode> brokerEntry = brokerEntries.next(); String brokerName = brokerEntry.getKey(); if (brokerName.startsWith("Broker_") && "ONLINE".equals(brokerEntry.getValue().asText())) { - // Turn Broker_12.34.56.78_1234 into 12.34.56.78:1234 - String brokerHostPort = brokerName.replace("Broker_", "").replace("_", ":"); - brokerUrls.add(brokerHostPort); + brokerUrls.add(getHostPort(brokerName)); } } } diff --git a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java index c13b930..733aaa0 100644 --- a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java +++ b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java @@ -62,6 +62,11 @@ public class DynamicBrokerSelectorTest { } @Override + protected ExternalViewReader getEvReader(ZkClient zkClient, boolean preferTlsPort) { + return _mockExternalViewReader; + } + + @Override protected ZkClient getZkClient(String zkServers) { return _mockZkClient; } diff --git a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExternalViewReaderTest.java b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExternalViewReaderTest.java index 36fb91d..e70e843 100644 --- a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExternalViewReaderTest.java +++ b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExternalViewReaderTest.java @@ -18,9 +18,11 @@ */ package org.apache.pinot.client; +import com.google.common.collect.ImmutableMap; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -44,6 +46,37 @@ public class ExternalViewReaderTest { private ExternalViewReader _externalViewReaderUnderTest; + private final String _instanceConfigPlain = "{\n" + + " \"id\": \"Broker_12.34.56.78_1234\",\n" + + " \"simpleFields\": {\n" + + " \"HELIX_ENABLED\": \"true\",\n" + + " \"HELIX_ENABLED_TIMESTAMP\": \"1646486555646\",\n" + + " \"HELIX_HOST\": \"first.pug-pinot-broker-headless\",\n" + + " \"HELIX_PORT\": \"8099\"\n" + + " },\n" + + " \"mapFields\": {},\n" + + " \"listFields\": {\n" + + " \"TAG_LIST\": [\n" + + " \"DefaultTenant_BROKER\"\n" + + " ]\n" + + " }\n" + + "}"; + private final String _instanceConfigTls = "{\n" + + " \"id\": \"Broker_12.34.56.78_1234\",\n" + + " \"simpleFields\": {\n" + + " \"HELIX_ENABLED\": \"true\",\n" + + " \"HELIX_ENABLED_TIMESTAMP\": \"1646486555646\",\n" + + " \"HELIX_HOST\": \"first.pug-pinot-broker-headless\",\n" + + " \"HELIX_PORT\": \"8099\",\n" + + " \"PINOT_TLS_PORT\": \"8090\"" + + " },\n" + + " \"mapFields\": {},\n" + + " \"listFields\": {\n" + + " \"TAG_LIST\": [\n" + + " \"DefaultTenant_BROKER\"\n" + + " ]\n" + + " }\n" + + "}"; @BeforeMethod public void setUp() throws Exception { @@ -112,4 +145,80 @@ public class ExternalViewReaderTest { // Verify the results assertEquals(expectedResult, result); } + + @Test + public void testGetBrokersMapByInstanceConfig() { + configureData(_instanceConfigPlain, true); + // Run the test + final Map<String, List<String>> result = _externalViewReaderUnderTest.getTableToBrokersMap(); + final Map<String, List<String>> expectedResult = ImmutableMap.of("field1", + Arrays.asList("first.pug-pinot-broker-headless:8099")); + // Verify the results + assertEquals(expectedResult, result); + } + + private void configureData(String instanceConfigPlain, boolean preferTls) { + when(_mockZkClient.readData(ExternalViewReader.BROKER_EXTERNAL_VIEW_PATH, true)) + .thenReturn("json".getBytes()); + when(_mockZkClient.readData(ExternalViewReader.BROKER_INSTANCE_PATH + "/Broker_12.34.56.78_1234", true)) + .thenReturn(instanceConfigPlain.getBytes(StandardCharsets.UTF_8)); + _externalViewReaderUnderTest._preferTlsPort = preferTls; + } + + @Test + public void testGetBrokerListByInstanceConfigDefault() { + configureData(_instanceConfigPlain, false); + final List<String> brokers = _externalViewReaderUnderTest.getLiveBrokers(); + assertEquals(brokers, Arrays.asList("first.pug-pinot-broker-headless:8099")); + } + + @Test + public void testGetBrokersMapByInstanceConfigTlsDefault() { + configureData(_instanceConfigTls, false); + final Map<String, List<String>> result = _externalViewReaderUnderTest.getTableToBrokersMap(); + final Map<String, List<String>> expectedResult = ImmutableMap.of("field1", + Arrays.asList("first.pug-pinot-broker-headless:8099")); + // Verify the results + assertEquals(expectedResult, result); + } + @Test + public void testGetBrokerListByInstanceConfigTlsDefault() { + configureData(_instanceConfigTls, false); + final List<String> brokers = _externalViewReaderUnderTest.getLiveBrokers(); + assertEquals(brokers, Arrays.asList("first.pug-pinot-broker-headless:8099")); + } + + @Test + public void testGetBrokersMapByInstanceConfigDefault() { + configureData(_instanceConfigPlain, false); + // Run the test + final Map<String, List<String>> result = _externalViewReaderUnderTest.getTableToBrokersMap(); + final Map<String, List<String>> expectedResult = ImmutableMap.of("field1", + Arrays.asList("first.pug-pinot-broker-headless:8099")); + // Verify the results + assertEquals(expectedResult, result); + } + + @Test + public void testGetBrokerListByInstanceConfig() { + configureData(_instanceConfigPlain, true); + final List<String> brokers = _externalViewReaderUnderTest.getLiveBrokers(); + assertEquals(brokers, Arrays.asList("first.pug-pinot-broker-headless:8099")); + } + + @Test + public void testGetBrokersMapByInstanceConfigTls() { + configureData(_instanceConfigTls, true); + final Map<String, List<String>> result = _externalViewReaderUnderTest.getTableToBrokersMap(); + final Map<String, List<String>> expectedResult = ImmutableMap.of("field1", + Arrays.asList("first.pug-pinot-broker-headless:8090")); + // Verify the results + assertEquals(expectedResult, result); + } + @Test + public void testGetBrokerListByInstanceConfigTls() { + configureData(_instanceConfigTls, true); + final List<String> brokers = _externalViewReaderUnderTest.getLiveBrokers(); + assertEquals(brokers, Arrays.asList("first.pug-pinot-broker-headless:8090")); + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/helix/ExtraInstanceConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/helix/ExtraInstanceConfig.java new file mode 100644 index 0000000..6982093 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/helix/ExtraInstanceConfig.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.common.helix; + +import org.apache.helix.model.InstanceConfig; + + +/** + * Pinot extended Instance Config for pinot extra configuration like TlsPort, etc + */ +public class ExtraInstanceConfig { + + private final InstanceConfig _proxy; + + public enum PinotInstanceConfigProperty { + PINOT_TLS_PORT + } + + public ExtraInstanceConfig(InstanceConfig proxy) { + _proxy = proxy; + } + + public String getTlsPort() { + return _proxy.getRecord().getSimpleField(PinotInstanceConfigProperty.PINOT_TLS_PORT.toString()); + } + + public void setTlsPort(String tlsPort) { + _proxy.getRecord().setSimpleField(PinotInstanceConfigProperty.PINOT_TLS_PORT.toString(), tlsPort); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java index 8534ede..0144a1c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java @@ -46,6 +46,7 @@ import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.pinot.common.helix.ExtraInstanceConfig; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.spi.config.table.TableConfig; @@ -635,6 +636,18 @@ public class HelixHelper { } /** + * Updates a tlsPort value into Pinot instance config so it can be retrieved later + * @param instanceConfig the instance config to update + * @param tlsPort the tlsPort number + * @return true if updated + */ + public static boolean updateTlsPort(InstanceConfig instanceConfig, int tlsPort) { + ExtraInstanceConfig pinotInstanceConfig = new ExtraInstanceConfig(instanceConfig); + pinotInstanceConfig.setTlsPort(String.valueOf(tlsPort)); + return true; + } + + /** * Adds default tags to the instance config if no tag exists, returns {@code true} if the default tags are added, * {@code false} otherwise. * <p>The {@code defaultTagsSupplier} is a function which is only invoked when the instance does not have any tag. diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index f60904a..6a3e9a5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -127,6 +127,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { protected String _helixClusterName; protected String _hostname; protected int _port; + protected int _tlsPort; protected String _helixControllerInstanceId; protected String _helixParticipantInstanceId; protected boolean _isUpdateStateModel; @@ -168,6 +169,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { inferHostnameIfNeeded(_config); _hostname = _config.getControllerHost(); _port = _listenerConfigs.get(0).getPort(); + _tlsPort = ListenerConfigUtil.findLastTlsPort(_listenerConfigs, 0); // NOTE: Use <hostname>_<port> as Helix controller instance id because ControllerLeaderLocator relies on this format // to parse the leader controller's hostname and port // TODO: Use the same instance id for controller and participant when leadControllerResource is always enabled after @@ -607,6 +609,9 @@ public abstract class BaseControllerStarter implements ServiceStartable { InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_helixParticipantManager, _helixParticipantInstanceId); boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port); + if (_tlsPort > 0) { + updated |= HelixHelper.updateTlsPort(instanceConfig, _tlsPort); + } updated |= HelixHelper .addDefaultTags(instanceConfig, () -> Collections.singletonList(CommonConstants.Helix.CONTROLLER_INSTANCE)); if (updated) { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/util/ListenerConfigUtilTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/util/ListenerConfigUtilTest.java index f6c6a38..c92df08 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/util/ListenerConfigUtilTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/util/ListenerConfigUtilTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller.util; +import com.google.common.collect.ImmutableList; import java.util.List; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.core.transport.ListenerConfig; @@ -172,6 +173,31 @@ public class ListenerConfigUtilTest { ListenerConfigUtil.buildControllerConfigs(controllerConf); } + @Test + public void testFindLastTlsPort() { + List<ListenerConfig> configs = ImmutableList.of( + new ListenerConfig("conf1", "host1", 9000, "http", null), + new ListenerConfig("conf2", "host2", 9001, "https", null), + new ListenerConfig("conf3", "host3", 9002, "http", null), + new ListenerConfig("conf4", "host4", 9003, "https", null), + new ListenerConfig("conf5", "host5", 9004, "http", null) + ); + int tlsPort = ListenerConfigUtil.findLastTlsPort(configs, -1); + Assert.assertEquals(tlsPort, 9003); + } + + @Test + public void testFindLastTlsPortMissing() { + List<ListenerConfig> configs = ImmutableList.of( + new ListenerConfig("conf1", "host1", 9000, "http", null), + new ListenerConfig("conf2", "host2", 9001, "http", null), + new ListenerConfig("conf3", "host3", 9002, "http", null), + new ListenerConfig("conf4", "host4", 9004, "http", null) + ); + int tlsPort = ListenerConfigUtil.findLastTlsPort(configs, -1); + Assert.assertEquals(tlsPort, -1); + } + private void assertLegacyListener(ListenerConfig legacyListener) { Assert.assertEquals(legacyListener.getName(), "http"); Assert.assertEquals(legacyListener.getHost(), "0.0.0.0"); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/ListenerConfigUtil.java b/pinot-core/src/main/java/org/apache/pinot/core/util/ListenerConfigUtil.java index 3aa8378..7de0596 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/ListenerConfigUtil.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/ListenerConfigUtil.java @@ -50,6 +50,8 @@ import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder; import org.glassfish.jersey.process.JerseyProcessingUncaughtExceptionHandler; import org.glassfish.jersey.server.ResourceConfig; +import static org.apache.pinot.spi.utils.CommonConstants.HTTPS_PROTOCOL; + /** * Utility class that generates Http {@link ListenerConfig} instances @@ -239,6 +241,20 @@ public final class ListenerConfigUtil { httpServer.addListener(listener); } + /** + * Finds the last listener that has HTTPS protocol, and returns its port. If not found any TLS, return defaultValue + * @param configs the config to search + * @param defaultValue the default value if the TLS listener is not found + * @return the port number of last entry that has secure protocol. If not found then defaultValue + */ + public static int findLastTlsPort(List<ListenerConfig> configs, int defaultValue) { + return configs.stream() + .filter(config -> config.getProtocol().equalsIgnoreCase(HTTPS_PROTOCOL)) + .map(ListenerConfig::getPort) + .reduce((first, second) -> second) + .orElse(defaultValue); + } + private static SSLEngineConfigurator buildSSLEngineConfigurator(TlsConfig tlsConfig) { SSLContextConfigurator sslContextConfigurator = new SSLContextConfigurator(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java index 2070aa0..778ac91 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java @@ -28,9 +28,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.io.FileUtils; +import org.apache.helix.model.InstanceConfig; import org.apache.http.Header; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -45,7 +47,9 @@ import org.apache.pinot.client.ConnectionFactory; import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory; import org.apache.pinot.client.Request; import org.apache.pinot.client.ResultSetGroup; +import org.apache.pinot.common.helix.ExtraInstanceConfig; import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.integration.tests.access.CertBasedTlsChannelAccessControlFactory; @@ -277,6 +281,17 @@ public class TlsIntegrationTest extends BaseClusterIntegrationTest { } @Test + public void testUpdatedBrokerTlsPort() { + + List<InstanceConfig> instanceConfigs = HelixHelper.getInstanceConfigs(_helixManager); + List<ExtraInstanceConfig> securedInstances = + instanceConfigs.stream().map(ExtraInstanceConfig::new) + .filter(pinotInstanceConfig -> pinotInstanceConfig.getTlsPort() != null) + .collect(Collectors.toList()); + Assert.assertFalse(securedInstances.isEmpty()); + } + + @Test public void testControllerConfigValidation() throws Exception { PinotConfigUtils.validateControllerConfig(new ControllerConf(getDefaultControllerConfiguration())); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org