This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 82f7aefe3d Ensure all records are pushed in Pulsar Consumer Test (#8554) 82f7aefe3d is described below commit 82f7aefe3d0e9e9cee5ba519279a1425259210ce Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Mon Apr 18 10:11:16 2022 +0530 Ensure all records are pushed in Pulsar Consumer Test (#8554) * Use testcontainers instead of standalone CLI; Enable checks for all records to get pushed * remove pulsar-broker dependency * Add debug logging to pulsar consumer * Adding re-runs just to verify flakiness, will remove later * Increase timeout * lint fix * remove re-runs and increase timeout Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> --- .../pinot-stream-ingestion/pinot-pulsar/pom.xml | 154 +--- .../plugin/stream/pulsar/PulsarConsumerTest.java | 128 ++- .../stream/pulsar/PulsarStandaloneCluster.java | 132 --- .../src/test/resources/standalone.properties | 904 --------------------- 4 files changed, 111 insertions(+), 1207 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml index d9446d1e53..3956d64284 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml @@ -68,6 +68,12 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>pulsar</artifactId> + <version>1.17.1</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-original</artifactId> @@ -108,154 +114,6 @@ <artifactId>pulsar-client-admin-original</artifactId> <version>${pulsar.version}</version> </dependency> - <dependency> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-broker</artifactId> - <version>${pulsar.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-client-original</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-client-admin-original</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty-resolver</artifactId> - </exclusion> - <exclusion> - <groupId>javax.ws.rs</groupId> - <artifactId>javax.ws.rs-api</artifactId> - </exclusion> - <exclusion> - <groupId>io.prometheus</groupId> - <artifactId>simpleclient_common</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.api.grpc</groupId> - <artifactId>proto-google-common-protos</artifactId> - </exclusion> - <exclusion> - <groupId>io.grpc</groupId> - <artifactId>grpc-context</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty-tcnative-boringssl-static</artifactId> - </exclusion> - <exclusion> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.errorprone</groupId> - <artifactId>error_prone_annotations</artifactId> - </exclusion> - <exclusion> - <groupId>org.glassfish.jersey.containers</groupId> - <artifactId>jersey-container-servlet-core</artifactId> - </exclusion> - <exclusion> - <groupId>io.prometheus</groupId> - <artifactId>simpleclient</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-server</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-servlet</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-io</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-http</artifactId> - </exclusion> - <exclusion> - <groupId>com.squareup.okio</groupId> - <artifactId>okio</artifactId> - </exclusion> - <exclusion> - <groupId>io.prometheus</groupId> - <artifactId>simpleclient_hotspot</artifactId> - </exclusion> - <exclusion> - <groupId>io.swagger</groupId> - <artifactId>swagger-annotations</artifactId> - </exclusion> - <exclusion> - <groupId>io.grpc</groupId> - <artifactId>grpc-protobuf-lite</artifactId> - </exclusion> - <exclusion> - <groupId>io.opencensus</groupId> - <artifactId>opencensus-contrib-grpc-metrics</artifactId> - </exclusion> - <exclusion> - <groupId>io.opencensus</groupId> - <artifactId>opencensus-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.commons</groupId> - <artifactId>commons-collections4</artifactId> - </exclusion> - <exclusion> - <groupId>javax.annotation</groupId> - <artifactId>javax.annotation-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.mojo</groupId> - <artifactId>animal-sniffer-annotations</artifactId> - </exclusion> - <exclusion> - <groupId>com.github.ben-manes.caffeine</groupId> - <artifactId>caffeine</artifactId> - </exclusion> - <exclusion> - <groupId>com.beust</groupId> - <artifactId>jcommander</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>javax.servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty-codec-socks</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty-transport-native-unix-common</artifactId> - </exclusion> - <exclusion> - <groupId>jakarta.ws.rs</groupId> - <artifactId>jakarta.ws.rs-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.bouncycastle</groupId> - <artifactId>bcpkix-jdk15on</artifactId> - </exclusion> - <exclusion> - <groupId>org.bouncycastle</groupId> - <artifactId>bcprov-ext-jdk15on</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - </exclusions> - </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java index 53ad46a728..527b76bc27 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java @@ -18,10 +18,13 @@ */ package org.apache.pinot.plugin.stream.pulsar; +import com.google.common.base.Function; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.PartitionGroupConsumer; import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; @@ -30,6 +33,7 @@ import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRouter; @@ -40,6 +44,11 @@ import org.apache.pulsar.client.api.TopicMetadata; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.internal.DefaultImplementation; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.utility.DockerImageName; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -47,7 +56,9 @@ import org.testng.annotations.Test; public class PulsarConsumerTest { + private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerTest.class); + private static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:2.7.2"); public static final String TABLE_NAME_WITH_TYPE = "tableName_REALTIME"; public static final String TEST_TOPIC = "test-topic"; public static final String TEST_TOPIC_BATCH = "test-topic-batch"; @@ -57,9 +68,10 @@ public class PulsarConsumerTest { public static final int NUM_PARTITION = 1; public static final int NUM_RECORDS_PER_PARTITION = 1000; public static final int BATCH_SIZE = 10; + public static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int) Duration.ofMinutes(5).toMillis(); private PulsarClient _pulsarClient; - private PulsarStandaloneCluster _pulsarStandaloneCluster; + private PulsarContainer _pulsar = null; private HashMap<Integer, MessageId> _partitionToFirstMessageIdMap = new HashMap<>(); private HashMap<Integer, MessageId> _partitionToFirstMessageIdMapBatch = new HashMap<>(); @@ -67,35 +79,79 @@ public class PulsarConsumerTest { public void setUp() throws Exception { try { - _pulsarStandaloneCluster = new PulsarStandaloneCluster(); + _pulsar = new PulsarContainer(PULSAR_IMAGE).withStartupTimeout(Duration.ofMinutes(5)); + _pulsar.start(); - _pulsarStandaloneCluster.start(); + // Waiting for namespace to be created. + // There should be a better approach. + Thread.sleep(20 * 1000L); - PulsarAdmin admin = - PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + _pulsarStandaloneCluster.getAdminPort()).build(); + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(_pulsar.getHttpServiceUrl()).build(); - String bootstrapServer = "pulsar://localhost:" + _pulsarStandaloneCluster.getBrokerPort(); + String bootstrapServer = _pulsar.getPulsarBrokerUrl(); _pulsarClient = PulsarClient.builder().serviceUrl(bootstrapServer).build(); - admin.topics().createPartitionedTopic(TEST_TOPIC, NUM_PARTITION); - admin.topics().createPartitionedTopic(TEST_TOPIC_BATCH, NUM_PARTITION); + createTopics(admin); publishRecords(); publishRecordsBatch(); + + waitForMessagesToPublish(admin, TEST_TOPIC); + waitForMessagesToPublish(admin, TEST_TOPIC_BATCH); + + admin.close(); } catch (Exception e) { - if (_pulsarStandaloneCluster != null) { - _pulsarStandaloneCluster.stop(); + if (_pulsar != null) { + _pulsar.stop(); + _pulsar = null; } throw new RuntimeException("Failed to setUp test environment", e); } } + private void createTopics(PulsarAdmin admin) + throws PulsarAdminException { + InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(); + inactiveTopicPolicies.setDeleteWhileInactive(false); + admin.namespaces().setInactiveTopicPolicies("public/default", inactiveTopicPolicies); + + admin.topics().createPartitionedTopic(TEST_TOPIC, NUM_PARTITION); + admin.topics().createPartitionedTopic(TEST_TOPIC_BATCH, NUM_PARTITION); + } + + private void waitForMessagesToPublish(PulsarAdmin admin, String topicName) { + waitForCondition(new Function<Void, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable Void aVoid) { + try { + return getNumberOfEntries(admin, topicName) == NUM_RECORDS_PER_PARTITION * NUM_PARTITION; + } catch (Exception e) { + LOGGER.warn("Could not fetch number of messages in pulsar topic " + topicName, e); + return null; + } + } + }, 2000L, 60 * 1000L, "Failed to produce " + NUM_RECORDS_PER_PARTITION * NUM_PARTITION + " messages", true); + } + + private long getNumberOfEntries(PulsarAdmin admin, String topicName) { + try { + return admin.topics().getPartitionedStats(topicName, false).msgInCounter; + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("Could not fetch number of rows in pulsar topic " + topicName, e); + } + return -1; + } + @AfterClass public void tearDown() throws Exception { - if (_pulsarStandaloneCluster != null) { - _pulsarStandaloneCluster.stop(); + if (_pulsar != null) { + _pulsar.stop(); + _pulsarClient.close(); + _pulsar = null; } } @@ -149,7 +205,7 @@ public class PulsarConsumerTest { public StreamConfig getStreamConfig(String topicName) { String streamType = "pulsar"; - String streamPulsarBrokerList = "pulsar://localhost:" + _pulsarStandaloneCluster.getBrokerPort(); + String streamPulsarBrokerList = _pulsar.getPulsarBrokerUrl(); String streamPulsarConsumerType = "simple"; String tableNameWithType = TABLE_NAME_WITH_TYPE; @@ -180,7 +236,8 @@ public class PulsarConsumerTest { final StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC)); int numPartitions = - new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig(TEST_TOPIC)).fetchPartitionCount(10000); + new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig(TEST_TOPIC)).fetchPartitionCount( + CONSUMER_FETCH_TIMEOUT_MILLIS); for (int partition = 0; partition < numPartitions; partition++) { PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = @@ -192,7 +249,8 @@ public class PulsarConsumerTest { final PartitionGroupConsumer consumer = streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus); final MessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest), - new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), 10000); + new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), + CONSUMER_FETCH_TIMEOUT_MILLIS); Assert.assertEquals(messageBatch1.getMessageCount(), 500); for (int i = 0; i < messageBatch1.getMessageCount(); i++) { final byte[] msg = (byte[]) messageBatch1.getMessageAtIndex(i); @@ -202,7 +260,7 @@ public class PulsarConsumerTest { final MessageBatch messageBatch2 = consumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), null, - 10000); + CONSUMER_FETCH_TIMEOUT_MILLIS); Assert.assertEquals(messageBatch2.getMessageCount(), 500); for (int i = 0; i < messageBatch2.getMessageCount(); i++) { final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i); @@ -212,7 +270,8 @@ public class PulsarConsumerTest { final MessageBatch messageBatch3 = consumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 10)), - new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 35)), 10000); + new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 35)), + CONSUMER_FETCH_TIMEOUT_MILLIS); Assert.assertEquals(messageBatch3.getMessageCount(), 25); for (int i = 0; i < messageBatch3.getMessageCount(); i++) { final byte[] msg = (byte[]) messageBatch3.getMessageAtIndex(i); @@ -230,7 +289,8 @@ public class PulsarConsumerTest { final StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC_BATCH)); int numPartitions = - new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig(TEST_TOPIC_BATCH)).fetchPartitionCount(10000); + new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig(TEST_TOPIC_BATCH)) + .fetchPartitionCount(CONSUMER_FETCH_TIMEOUT_MILLIS); for (int partition = 0; partition < numPartitions; partition++) { PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = @@ -241,8 +301,10 @@ public class PulsarConsumerTest { final PartitionGroupConsumer consumer = streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus); + //TODO: This test failed, check it out. final MessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest), - new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)), 10000); + new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)), + CONSUMER_FETCH_TIMEOUT_MILLIS); Assert.assertEquals(messageBatch1.getMessageCount(), 500); for (int i = 0; i < messageBatch1.getMessageCount(); i++) { final byte[] msg = (byte[]) messageBatch1.getMessageAtIndex(i); @@ -252,7 +314,7 @@ public class PulsarConsumerTest { final MessageBatch messageBatch2 = consumer.fetchMessages(new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)), null, - 10000); + CONSUMER_FETCH_TIMEOUT_MILLIS); Assert.assertEquals(messageBatch2.getMessageCount(), 500); for (int i = 0; i < messageBatch2.getMessageCount(); i++) { final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i); @@ -262,7 +324,8 @@ public class PulsarConsumerTest { final MessageBatch messageBatch3 = consumer.fetchMessages(new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 10)), - new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 35)), 10000); + new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 35)), + CONSUMER_FETCH_TIMEOUT_MILLIS); Assert.assertEquals(messageBatch3.getMessageCount(), 25); for (int i = 0; i < messageBatch3.getMessageCount(); i++) { final byte[] msg = (byte[]) messageBatch3.getMessageAtIndex(i); @@ -273,16 +336,35 @@ public class PulsarConsumerTest { } } - public MessageId getMessageIdForPartitionAndIndex(int partitionNum, int index) { + private MessageId getMessageIdForPartitionAndIndex(int partitionNum, int index) { MessageId startMessageIdRaw = _partitionToFirstMessageIdMap.get(partitionNum); MessageIdImpl startMessageId = MessageIdImpl.convertToMessageIdImpl(startMessageIdRaw); return DefaultImplementation.newMessageId(startMessageId.getLedgerId(), index, partitionNum); } - public MessageId getBatchMessageIdForPartitionAndIndex(int partitionNum, int index) { + private MessageId getBatchMessageIdForPartitionAndIndex(int partitionNum, int index) { MessageId startMessageIdRaw = _partitionToFirstMessageIdMapBatch.get(partitionNum); BatchMessageIdImpl startMessageId = (BatchMessageIdImpl) MessageIdImpl.convertToMessageIdImpl(startMessageIdRaw); return new BatchMessageIdImpl(startMessageId.getLedgerId(), index / BATCH_SIZE, partitionNum, index % BATCH_SIZE, startMessageId.getBatchSize(), startMessageId.getAcker()); } + + private void waitForCondition(Function<Void, Boolean> condition, long checkIntervalMs, long timeoutMs, + @Nullable String errorMessage, boolean raiseError) { + long endTime = System.currentTimeMillis() + timeoutMs; + String errorMessageSuffix = errorMessage != null ? ", error message: " + errorMessage : ""; + while (System.currentTimeMillis() < endTime) { + try { + if (Boolean.TRUE.equals(condition.apply(null))) { + return; + } + Thread.sleep(checkIntervalMs); + } catch (Exception e) { + Assert.fail("Caught exception while checking the condition" + errorMessageSuffix, e); + } + } + if (raiseError) { + Assert.fail("Failed to meet condition in " + timeoutMs + "ms" + errorMessageSuffix); + } + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarStandaloneCluster.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarStandaloneCluster.java deleted file mode 100644 index d933f7f268..0000000000 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarStandaloneCluster.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.stream.pulsar; - -import java.io.File; -import java.io.FileInputStream; -import java.util.Optional; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.PulsarStandalone; -import org.apache.pulsar.PulsarStandaloneBuilder; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class PulsarStandaloneCluster { - private static final Logger LOGGER = LoggerFactory.getLogger(PulsarStandaloneCluster.class); - - public static final String DEFAULT_STANDALONE_CONF = "standalone.properties"; - public static final String DEFAULT_ZK_DIR = "pulsar-zk"; - public static final String DEFAULT_BK_DIR = "pulsar-bookeeper"; - - private Integer _brokerPort; - private Integer _adminPort; - private String _zkDir; - private String _bkDir; - - private PulsarStandalone _pulsarStandalone; - private File _tempDir; - - public void setBrokerPort(Integer brokerPort) { - _brokerPort = brokerPort; - } - - public void setAdminPort(Integer adminPort) { - _adminPort = adminPort; - } - - public void setZkDir(String zkDir) { - _zkDir = zkDir; - } - - public void setBkDir(String bkDir) { - _bkDir = bkDir; - } - - public Integer getBrokerPort() { - return _brokerPort; - } - - public Integer getAdminPort() { - return _adminPort; - } - - public void start() - throws Exception { - final File clusterConfigFile = new File(getClass().getClassLoader().getResource(DEFAULT_STANDALONE_CONF).toURI()); - - String zkDir = StringUtils.isBlank(_zkDir) ? DEFAULT_ZK_DIR : _zkDir; - String bkDir = StringUtils.isBlank(_bkDir) ? DEFAULT_BK_DIR : _bkDir; - _tempDir = FileUtils.getTempDirectory(); - File zkDirFile = new File(_tempDir, zkDir); - File bkDirFile = new File(_tempDir, bkDir); - zkDirFile.mkdirs(); - bkDirFile.mkdirs(); - - ServiceConfiguration config = - PulsarConfigurationLoader.create((new FileInputStream(clusterConfigFile)), ServiceConfiguration.class); - config.setManagedLedgerDefaultEnsembleSize(1); - config.setManagedLedgerDefaultWriteQuorum(1); - config.setManagedLedgerDefaultAckQuorum(1); - String zkServers = "127.0.0.1"; - config.setAdvertisedAddress("localhost"); - - _pulsarStandalone = PulsarStandaloneBuilder.instance().withConfig(config).withNoStreamStorage(true).build(); - _pulsarStandalone.setZkDir(zkDirFile.getAbsolutePath()); - _pulsarStandalone.setBkDir(bkDirFile.getAbsolutePath()); - - if (config.getZookeeperServers() != null) { - _pulsarStandalone.setZkPort(Integer.parseInt(config.getZookeeperServers().split(":")[1])); - } - - config.setZookeeperServers(zkServers + ":" + _pulsarStandalone.getZkPort()); - config.setConfigurationStoreServers(zkServers + ":" + _pulsarStandalone.getZkPort()); - - config.setRunningStandalone(true); - - if (_brokerPort != null) { - config.setBrokerServicePort(Optional.of(_brokerPort)); - } else { - _brokerPort = config.getBrokerServicePort().get(); - } - - if (_adminPort != null) { - config.setWebServicePort(Optional.of(_adminPort)); - } else { - _adminPort = config.getWebServicePort().get(); - } - - _pulsarStandalone.setConfigFile(clusterConfigFile.getAbsolutePath()); - _pulsarStandalone.setConfig(config); - - _pulsarStandalone.start(); - } - - public void stop() { - try { - _pulsarStandalone.close(); - _tempDir.delete(); - } catch (Exception e) { - LOGGER.warn("Failed to stop embedded pulsar and zookeeper", e); - } - } -} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/resources/standalone.properties b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/resources/standalone.properties deleted file mode 100644 index 452161b570..0000000000 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/resources/standalone.properties +++ /dev/null @@ -1,904 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -### --- General broker settings --- ### - -# Zookeeper quorum connection string -zookeeperServers= - -# Configuration Store connection string -configurationStoreServers= - -brokerServicePort=6650 - -# Port to use to server HTTP request -webServicePort=8080 - -# Hostname or IP address the service binds on, default is 0.0.0.0. -bindAddress=0.0.0.0 - -# Hostname or IP address the service advertises to the outside world. If not set, the value of -# InetAddress.getLocalHost().getHostName() is used. -advertisedAddress= - -# Enable or disable the HAProxy protocol. -haProxyProtocolEnabled=false - -# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors() -numIOThreads= - -# Number of threads to use for ordered executor. The ordered executor is used to operate with zookeeper, -# such as init zookeeper client, get namespace policies from zookeeper etc. It also used to split bundle. Default is 8 -numOrderedExecutorThreads=8 - -# Number of threads to use for HTTP requests processing. -# Default is set to 2 * Runtime.getRuntime().availableProcessors() -numHttpServerThreads= - -# Number of thread pool size to use for pulsar broker service. -# The executor in thread pool will do basic broker operation like load/unload bundle, update managedLedgerConfig, -# update topic/subscription/replicator message dispatch rate, do leader election etc. -# Default is Runtime.getRuntime().availableProcessors() -numExecutorThreadPoolSize= - -# Number of thread pool size to use for pulsar zookeeper callback service -# The cache executor thread pool is used for restarting global zookeeper session. -# Default is 10 -numCacheExecutorThreadPoolSize=10 - -# Max concurrent web requests -maxConcurrentHttpRequests=1024 - -# Name of the cluster to which this broker belongs to -clusterName=standalone - -# Enable cluster's failure-domain which can distribute brokers into logical region -failureDomainsEnabled=false - -# Zookeeper session timeout in milliseconds -zooKeeperSessionTimeoutMillis=30000 - -# ZooKeeper operation timeout in seconds -zooKeeperOperationTimeoutSeconds=30 - -# ZooKeeper cache expiry time in seconds -zooKeeperCacheExpirySeconds=300 - -# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed -brokerShutdownTimeoutMs=60000 - -# Flag to skip broker shutdown when broker handles Out of memory error -skipBrokerShutdownOnOOM=false - -# Enable backlog quota check. Enforces action on topic when the quota is reached -backlogQuotaCheckEnabled=true - -# How often to check for topics that have reached the quota -backlogQuotaCheckIntervalInSeconds=60 - -# Default per-topic backlog quota limit -backlogQuotaDefaultLimitGB=10 - -# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) -ttlDurationDefaultInSeconds=0 - -# Enable the deletion of inactive topics -brokerDeleteInactiveTopicsEnabled=false - -# How often to check for inactive topics -brokerDeleteInactiveTopicsFrequencySeconds=60 - -# Max pending publish requests per connection to avoid keeping large number of pending -# requests in memory. Default: 1000 -maxPendingPublishdRequestsPerConnection=1000 - -# How frequently to proactively check and purge expired messages -messageExpiryCheckIntervalInMinutes=1000 - -# How long to delay rewinding cursor and dispatching messages when active consumer is changed -activeConsumerFailoverDelayTimeMillis=1000 - -# How long to delete inactive subscriptions from last consuming -# When it is 0, inactive subscriptions are not deleted automatically -subscriptionExpirationTimeMinutes=0 - -# Enable subscription message redelivery tracker to send redelivery count to consumer (default is enabled) -subscriptionRedeliveryTrackerEnabled=true - -# On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or -# consistent hashing to reassign keys to new consumers -subscriptionKeySharedUseConsistentHashing=false - -# On KeyShared subscriptions, number of points in the consistent-hashing ring. -# The higher the number, the more equal the assignment of keys to consumers -subscriptionKeySharedConsistentHashingReplicaPoints=100 - -# How frequently to proactively check and purge expired subscription -subscriptionExpiryCheckIntervalInMinutes=5 - -# Set the default behavior for message deduplication in the broker -# This can be overridden per-namespace. If enabled, broker will reject -# messages that were already stored in the topic -brokerDeduplicationEnabled=false - -# Maximum number of producer information that it's going to be -# persisted for deduplication purposes -brokerDeduplicationMaxNumberOfProducers=10000 - -# Number of entries after which a dedup info snapshot is taken. -# A bigger interval will lead to less snapshots being taken though it would -# increase the topic recovery time, when the entries published after the -# snapshot need to be replayed -brokerDeduplicationEntriesInterval=1000 - -# Time of inactivity after which the broker will discard the deduplication information -# relative to a disconnected producer. Default is 6 hours. -brokerDeduplicationProducerInactivityTimeoutMinutes=360 - -# When a namespace is created without specifying the number of bundle, this -# value will be used as the default -defaultNumberOfNamespaceBundles=4 - -# Enable check for minimum allowed client library version -clientLibraryVersionCheckEnabled=false - -# Path for the file used to determine the rotation status for the broker when responding -# to service discovery health checks -statusFilePath=/usr/local/apache/htdocs - -# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will -# stop sending messages to consumer once, this limit reaches until consumer starts acknowledging messages back -# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction -maxUnackedMessagesPerConsumer=50000 - -# Max number of unacknowledged messages allowed per shared subscription. Broker will stop dispatching messages to -# all consumers of the subscription once this limit reaches until consumer starts acknowledging messages back and -# unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit -# check and dispatcher can dispatch messages without any restriction -maxUnackedMessagesPerSubscription=200000 - -# Max number of unacknowledged messages allowed per broker. Once this limit reaches, broker will stop dispatching -# messages to all shared subscription which has higher number of unack messages until subscriptions start -# acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling -# unackedMessage-limit check and broker doesn't block dispatchers -maxUnackedMessagesPerBroker=0 - -# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages -# than this percentage limit and subscription will not receive any new messages until that subscription acks back -# limit/2 messages -maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 - -# Tick time to schedule task that checks topic publish rate limiting across all topics -# Reducing to lower value can give more accuracy while throttling publish but -# it uses more CPU to perform frequent check. (Disable publish throttling with value 0) -topicPublisherThrottlingTickTimeMillis=2 - -# Tick time to schedule task that checks broker publish rate limiting across all topics -# Reducing to lower value can give more accuracy while throttling publish but -# it uses more CPU to perform frequent check. (Disable publish throttling with value 0) -brokerPublisherThrottlingTickTimeMillis=50 - -# Max Rate(in 1 seconds) of Message allowed to publish for a broker if broker publish rate limiting enabled -# (Disable message rate limit with value 0) -brokerPublisherThrottlingMaxMessageRate=0 - -# Max Rate(in 1 seconds) of Byte allowed to publish for a broker if broker publish rate limiting enabled -# (Disable byte rate limit with value 0) -brokerPublisherThrottlingMaxByteRate=0 - -# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default -# message dispatch-throttling -dispatchThrottlingRatePerTopicInMsg=0 - -# Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling -# default message-byte dispatch-throttling -dispatchThrottlingRatePerTopicInByte=0 - -# Dispatch rate-limiting relative to publish rate. -# (Enabling flag will make broker to dynamically update dispatch-rate relatively to publish-rate: -# throttle-dispatch-rate = (publish-rate + configured dispatch-rate). -dispatchThrottlingRateRelativeToPublishRate=false - -# By default we enable dispatch-throttling for both caught up consumers as well as consumers who have -# backlog. -dispatchThrottlingOnNonBacklogConsumerEnabled=true - -# Precise dispathcer flow control according to history message number of each entry -preciseDispatcherFlowControl=false - -# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic -maxConcurrentLookupRequest=50000 - -# Max number of concurrent topic loading request broker allows to control number of zk-operations -maxConcurrentTopicLoadRequest=5000 - -# Max concurrent non-persistent message can be processed per connection -maxConcurrentNonPersistentMessagePerConnection=1000 - -# Number of worker threads to serve non-persistent topic -numWorkerThreadsForNonPersistentTopic=8 - -# Enable broker to load persistent topics -enablePersistentTopics=true - -# Enable broker to load non-persistent topics -enableNonPersistentTopics=true - -# Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers -# until the number of connected producers decrease. -# Using a value of 0, is disabling maxProducersPerTopic-limit check. -maxProducersPerTopic=0 - -# Enforce producer to publish encrypted messages.(default disable). -encryptionRequireOnProducer=false - -# Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers -# until the number of connected consumers decrease. -# Using a value of 0, is disabling maxConsumersPerTopic-limit check. -maxConsumersPerTopic=0 - -# Max number of subscriptions allowed to subscribe to topic. Once this limit reaches, broker will reject -# new subscription until the number of subscribed subscriptions decrease. -# Using a value of 0, is disabling maxSubscriptionsPerTopic limit check. -maxSubscriptionsPerTopic=0 - -# Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers -# until the number of connected consumers decrease. -# Using a value of 0, is disabling maxConsumersPerSubscription-limit check. -maxConsumersPerSubscription=0 - -# Max number of partitions per partitioned topic -# Use 0 or negative number to disable the check -maxNumPartitionsPerPartitionedTopic=0 - -### --- TLS --- ### -# Deprecated - Use webServicePortTls and brokerServicePortTls instead -tlsEnabled=false - -# Tls cert refresh duration in seconds (set 0 to check on every new connection) -tlsCertRefreshCheckDurationSec=300 - -# Path for the TLS certificate file -tlsCertificateFilePath= - -# Path for the TLS private key file -tlsKeyFilePath= - -# Path for the trusted TLS certificate file. -# This cert is used to verify that any certs presented by connecting clients -# are signed by a certificate authority. If this verification -# fails, then the certs are untrusted and the connections are dropped. -tlsTrustCertsFilePath= - -# Accept untrusted TLS certificate from client. -# If true, a client with a cert which cannot be verified with the -# 'tlsTrustCertsFilePath' cert will allowed to connect to the server, -# though the cert will not be used for client authentication. -tlsAllowInsecureConnection=false - -# Specify the tls protocols the broker will use to negotiate during TLS handshake -# (a comma-separated list of protocol names). -# Examples:- [TLSv1.2, TLSv1.1, TLSv1] -tlsProtocols= - -# Specify the tls cipher the broker will use to negotiate during TLS Handshake -# (a comma-separated list of ciphers). -# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256] -tlsCiphers= - -# Trusted client certificates are required for to connect TLS -# Reject the Connection if the Client Certificate is not trusted. -# In effect, this requires that all connecting clients perform TLS client -# authentication. -tlsRequireTrustedClientCertOnConnect=false - -### --- KeyStore TLS config variables --- ### -# Enable TLS with KeyStore type configuration in broker. -tlsEnabledWithKeyStore=false - -# TLS Provider for KeyStore type -tlsProvider= - -# TLS KeyStore type configuration in broker: JKS, PKCS12 -tlsKeyStoreType=JKS - -# TLS KeyStore path in broker -tlsKeyStore= - -# TLS KeyStore password for broker -tlsKeyStorePassword= - -# TLS TrustStore type configuration in broker: JKS, PKCS12 -tlsTrustStoreType=JKS - -# TLS TrustStore path in broker -tlsTrustStore= - -# TLS TrustStore password for broker -tlsTrustStorePassword= - -# Whether internal client use KeyStore type to authenticate with Pulsar brokers -brokerClientTlsEnabledWithKeyStore=false - -# The TLS Provider used by internal client to authenticate with other Pulsar brokers -brokerClientSslProvider= - -# TLS TrustStore type configuration for internal client: JKS, PKCS12 -# used by the internal client to authenticate with Pulsar brokers -brokerClientTlsTrustStoreType=JKS - -# TLS TrustStore path for internal client -# used by the internal client to authenticate with Pulsar brokers -brokerClientTlsTrustStore= - -# TLS TrustStore password for internal client, -# used by the internal client to authenticate with Pulsar brokers -brokerClientTlsTrustStorePassword= - -# Specify the tls cipher the internal client will use to negotiate during TLS Handshake -# (a comma-separated list of ciphers) -# e.g. [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]. -# used by the internal client to authenticate with Pulsar brokers -brokerClientTlsCiphers= - -# Specify the tls protocols the broker will use to negotiate during TLS handshake -# (a comma-separated list of protocol names). -# e.g. [TLSv1.2, TLSv1.1, TLSv1] -# used by the internal client to authenticate with Pulsar brokers -brokerClientTlsProtocols= - -# Enable or disable system topic -systemTopicEnabled=false - -# Enable or disable topic level policies, topic level policies depends on the system topic -# Please enable the system topic first. -topicLevelPoliciesEnabled=false - -# If a topic remains fenced for this number of seconds, it will be closed forcefully. -# If it is set to 0 or a negative number, the fenced topic will not be closed. -topicFencingTimeoutSeconds=0 - -### --- Authentication --- ### -# Role names that are treated as "proxy roles". If the broker sees a request with -#role as proxyRoles - it will demand to see a valid original principal. -proxyRoles= - -# If this flag is set then the broker authenticates the original Auth data -# else it just accepts the originalPrincipal and authorizes it (if required). -authenticateOriginalAuthData=false - -# Enable authentication -authenticationEnabled=false - -# Autentication provider name list, which is comma separated list of class names -authenticationProviders= - -# Enforce authorization -authorizationEnabled=false - -# Authorization provider fully qualified class-name -authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider - -# Allow wildcard matching in authorization -# (wildcard matching only applicable if wildcard-char: -# * presents at first or last position eg: *.pulsar.service, pulsar.service.*) -authorizationAllowWildcardsMatching=false - -# Role names that are treated as "super-user", meaning they will be able to do all admin -# operations and publish/consume from all topics -superUserRoles= - -# Authentication settings of the broker itself. Used when the broker connects to other brokers, -# either in same or other clusters -brokerClientAuthenticationPlugin= -brokerClientAuthenticationParameters= - -# Supported Athenz provider domain names(comma separated) for authentication -athenzDomainNames= - -# When this parameter is not empty, unauthenticated users perform as anonymousUserRole -anonymousUserRole= - -# The token "claim" that will be interpreted as the authentication "role" or "principal" by AuthenticationProviderToken -# (defaults to "sub" if blank) -tokenAuthClaim= - -# The token audience "claim" name, e.g. "aud", that will be used to get the audience from token. -# If not set, audience will not be verified. -tokenAudienceClaim= - -# The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this. -tokenAudience= - -### --- BookKeeper Client --- ### - -# Authentication plugin to use when connecting to bookies -bookkeeperClientAuthenticationPlugin= - -# BookKeeper auth plugin implementatation specifics parameters name and values -bookkeeperClientAuthenticationParametersName= -bookkeeperClientAuthenticationParameters= - -# Timeout for BK add / read operations -bookkeeperClientTimeoutInSeconds=30 - -# Speculative reads are initiated if a read request doesn't complete within a certain time -# Using a value of 0, is disabling the speculative reads -bookkeeperClientSpeculativeReadTimeoutInMillis=0 - -# Number of channels per bookie -bookkeeperNumberOfChannelsPerBookie=16 - -# Enable bookies health check. Bookies that have more than the configured number of failure within -# the interval will be quarantined for some time. During this period, new ledgers won't be created -# on these bookies -bookkeeperClientHealthCheckEnabled=true -bookkeeperClientHealthCheckIntervalSeconds=60 -bookkeeperClientHealthCheckErrorThresholdPerInterval=5 -bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800 - -#bookie quarantine ratio to avoid all clients quarantine the high pressure bookie servers at the same time -bookkeeperClientQuarantineRatio=1.0 - -# Enable rack-aware bookie selection policy. BK will chose bookies from different racks when -# forming a new bookie ensemble -# This parameter related to ensemblePlacementPolicy in conf/bookkeeper.conf, if enabled, ensemblePlacementPolicy -# should be set to org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy -bookkeeperClientRackawarePolicyEnabled=true - -# Enable region-aware bookie selection policy. BK will chose bookies from -# different regions and racks when forming a new bookie ensemble. -# If enabled, the value of bookkeeperClientRackawarePolicyEnabled is ignored -# This parameter related to ensemblePlacementPolicy in conf/bookkeeper.conf, if enabled, ensemblePlacementPolicy -# should be set to org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy -bookkeeperClientRegionawarePolicyEnabled=false - -# Minimum number of racks per write quorum. BK rack-aware bookie selection policy will try to -# get bookies from at least 'bookkeeperClientMinNumRacksPerWriteQuorum' racks for a write quorum. -bookkeeperClientMinNumRacksPerWriteQuorum=1 - -# Enforces rack-aware bookie selection policy to pick bookies from 'bookkeeperClientMinNumRacksPerWriteQuorum' -# racks for a writeQuorum. -# If BK can't find bookie then it would throw BKNotEnoughBookiesException instead of picking random one. -bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false - -# Enable/disable reordering read sequence on reading entries. -bookkeeperClientReorderReadSequenceEnabled=false - -# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie -# outside the specified groups will not be used by the broker -bookkeeperClientIsolationGroups= - -# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't -# have enough bookie available. -bookkeeperClientSecondaryIsolationGroups= - -# Minimum bookies that should be available as part of bookkeeperClientIsolationGroups -# else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list. -bookkeeperClientMinAvailableBookiesInIsolationGroups= - -# Set the client security provider factory class name. -# Default: org.apache.bookkeeper.tls.TLSContextFactory -bookkeeperTLSProviderFactoryClass=org.apache.bookkeeper.tls.TLSContextFactory - -# Enable tls authentication with bookie -bookkeeperTLSClientAuthentication=false - -# Supported type: PEM, JKS, PKCS12. Default value: PEM -bookkeeperTLSKeyFileType=PEM - -#Supported type: PEM, JKS, PKCS12. Default value: PEM -bookkeeperTLSTrustCertTypes=PEM - -# Path to file containing keystore password, if the client keystore is password protected. -bookkeeperTLSKeyStorePasswordPath= - -# Path to file containing truststore password, if the client truststore is password protected. -bookkeeperTLSTrustStorePasswordPath= - -# Path for the TLS private key file -bookkeeperTLSKeyFilePath= - -# Path for the TLS certificate file -bookkeeperTLSCertificateFilePath= - -# Path for the trusted TLS certificate file -bookkeeperTLSTrustCertsFilePath= - -# Enable/disable disk weight based placement. Default is false -bookkeeperDiskWeightBasedPlacementEnabled=false - -# Set the interval to check the need for sending an explicit LAC -# A value of '0' disables sending any explicit LACs. Default is 0. -bookkeeperExplicitLacIntervalInMills=0 - -# Use older Bookkeeper wire protocol with bookie -bookkeeperUseV2WireProtocol=true - -# Expose bookkeeper client managed ledger stats to prometheus. default is false -# bookkeeperClientExposeStatsToPrometheus=false - -### --- Managed Ledger --- ### - -# Number of bookies to use when creating a ledger -managedLedgerDefaultEnsembleSize=1 - -# Number of copies to store for each message -managedLedgerDefaultWriteQuorum=1 - -# Number of guaranteed copies (acks to wait before write is complete) -managedLedgerDefaultAckQuorum=1 - -# How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). -# Default is 60 seconds -managedLedgerCursorPositionFlushSeconds=60 - -# Default type of checksum to use when writing to BookKeeper. Default is "CRC32C" -# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum). -managedLedgerDigestType=CRC32C - -# Number of threads to be used for managed ledger tasks dispatching -managedLedgerNumWorkerThreads=4 - -# Number of threads to be used for managed ledger scheduled tasks -managedLedgerNumSchedulerThreads=4 - -# Amount of memory to use for caching data payload in managed ledger. This memory -# is allocated from JVM direct memory and it's shared across all the topics -# running in the same broker. By default, uses 1/5th of available direct memory -managedLedgerCacheSizeMB= - -# Whether we should make a copy of the entry payloads when inserting in cache -managedLedgerCacheCopyEntries=false - -# Threshold to which bring down the cache level when eviction is triggered -managedLedgerCacheEvictionWatermark=0.9 - -# Configure the cache eviction frequency for the managed ledger cache (evictions/sec) -managedLedgerCacheEvictionFrequency=100.0 - -# All entries that have stayed in cache for more than the configured time, will be evicted -managedLedgerCacheEvictionTimeThresholdMillis=1000 - -# Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged' -# and thus should be set as inactive. -managedLedgerCursorBackloggedThreshold=1000 - -# Rate limit the amount of writes generated by consumer acking the messages -managedLedgerDefaultMarkDeleteRateLimit=0.1 - -# Max number of entries to append to a ledger before triggering a rollover -# A ledger rollover is triggered on these conditions -# * Either the max rollover time has been reached -# * or max entries have been written to the ledged and at least min-time -# has passed -managedLedgerMaxEntriesPerLedger=50000 - -# Minimum time between ledger rollover for a topic -managedLedgerMinLedgerRolloverTimeMinutes=10 - -# Maximum time before forcing a ledger rollover for a topic -managedLedgerMaxLedgerRolloverTimeMinutes=240 - -# Max number of entries to append to a cursor ledger -managedLedgerCursorMaxEntriesPerLedger=50000 - -# Max time before triggering a rollover on a cursor ledger -managedLedgerCursorRolloverTimeInSeconds=14400 - -# Maximum ledger size before triggering a rollover for a topic (MB) -managedLedgerMaxSizePerLedgerMbytes=2048 - -# Max number of "acknowledgment holes" that are going to be persistently stored. -# When acknowledging out of order, a consumer will leave holes that are supposed -# to be quickly filled by acking all the messages. The information of which -# messages are acknowledged is persisted by compressing in "ranges" of messages -# that were acknowledged. After the max number of ranges is reached, the information -# will only be tracked in memory and messages will be redelivered in case of -# crashes. -managedLedgerMaxUnackedRangesToPersist=10000 - -# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher -# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into -# zookeeper. -managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 - -# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets -# corrupted at bookkeeper and managed-cursor is stuck at that ledger. -autoSkipNonRecoverableData=false - -# operation timeout while updating managed-ledger metadata. -managedLedgerMetadataOperationsTimeoutSeconds=60 - -# Read entries timeout when broker tries to read messages from bookkeeper. -managedLedgerReadEntryTimeoutSeconds=0 - -# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it). -managedLedgerAddEntryTimeoutSeconds=0 - -# New entries check delay for the cursor under the managed ledger. -# If no new messages in the topic, the cursor will try to check again after the delay time. -# For consumption latency sensitive scenario, can set to a smaller value or set to 0. -# Of course, use a smaller value may degrade consumption throughput. Default is 10ms. -managedLedgerNewEntriesCheckDelayInMillis=10 - -# Use Open Range-Set to cache unacked messages -managedLedgerUnackedRangesOpenCacheSetEnabled=true - -# Managed ledger prometheus stats latency rollover seconds (default: 60s) -managedLedgerPrometheusStatsLatencyRolloverSeconds=60 - -# Whether trace managed ledger task execution time -managedLedgerTraceTaskExecution=true - -### --- Load balancer --- ### - -loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager - -# Enable load balancer -loadBalancerEnabled=false - -# Percentage of change to trigger load report update -loadBalancerReportUpdateThresholdPercentage=10 - -# maximum interval to update load report -loadBalancerReportUpdateMaxIntervalMinutes=15 - -# Frequency of report to collect -loadBalancerHostUsageCheckIntervalMinutes=1 - -# Load shedding interval. Broker periodically checks whether some traffic should be offload from -# some over-loaded broker to other under-loaded brokers -loadBalancerSheddingIntervalMinutes=1 - -# Prevent the same topics to be shed and moved to other broker more that once within this timeframe -loadBalancerSheddingGracePeriodMinutes=30 - -# Usage threshold to allocate max number of topics to broker -loadBalancerBrokerMaxTopics=50000 - -# Interval to flush dynamic resource quota to ZooKeeper -loadBalancerResourceQuotaUpdateIntervalMinutes=15 - -# enable/disable namespace bundle auto split -loadBalancerAutoBundleSplitEnabled=true - -# enable/disable automatic unloading of split bundles -loadBalancerAutoUnloadSplitBundlesEnabled=true - -# maximum topics in a bundle, otherwise bundle split will be triggered -loadBalancerNamespaceBundleMaxTopics=1000 - -# maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered -loadBalancerNamespaceBundleMaxSessions=1000 - -# maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered -loadBalancerNamespaceBundleMaxMsgRate=30000 - -# maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered -loadBalancerNamespaceBundleMaxBandwidthMbytes=100 - -# maximum number of bundles in a namespace -loadBalancerNamespaceMaximumBundles=128 - -# The broker resource usage threshold. -# When the broker resource usage is gratter than the pulsar cluster average resource usge, -# the threshold shedder will be triggered to offload bundles from the broker. -# It only take effect in ThresholdSheddler strategy. -loadBalancerBrokerThresholdShedderPercentage=10 - -# When calculating new resource usage, the history usage accounts for. -# It only take effect in ThresholdSheddler strategy. -loadBalancerHistoryResourcePercentage=0.9 - -# The BandWithIn usage weight when calculating new resourde usage. -# It only take effect in ThresholdShedder strategy. -loadBalancerBandwithInResourceWeight=1.0 - -# The BandWithOut usage weight when calculating new resourde usage. -# It only take effect in ThresholdShedder strategy. -loadBalancerBandwithOutResourceWeight=1.0 - -# The CPU usage weight when calculating new resourde usage. -# It only take effect in ThresholdShedder strategy. -loadBalancerCPUResourceWeight=1.0 - -# The heap memory usage weight when calculating new resourde usage. -# It only take effect in ThresholdShedder strategy. -loadBalancerMemoryResourceWeight=1.0 - -# The direct memory usage weight when calculating new resourde usage. -# It only take effect in ThresholdShedder strategy. -loadBalancerDirectMemoryResourceWeight=1.0 - -# Bundle unload minimum throughput threshold (MB), avoding bundle unload frequently. -# It only take effect in ThresholdShedder strategy. -loadBalancerBundleUnloadMinThroughputThreshold=10 - -### --- Replication --- ### - -# Enable replication metrics -replicationMetricsEnabled=true - -# Max number of connections to open for each broker in a remote cluster -# More connections host-to-host lead to better throughput over high-latency -# links. -replicationConnectionsPerBroker=16 - -# Replicator producer queue size -replicationProducerQueueSize=1000 - -# Duration to check replication policy to avoid replicator inconsistency -# due to missing ZooKeeper watch (disable with value 0) -replicationPolicyCheckDurationSeconds=600 - -# Default message retention time -defaultRetentionTimeInMinutes=0 - -# Default retention size -defaultRetentionSizeInMB=0 - -# How often to check whether the connections are still alive -keepAliveIntervalSeconds=30 - -### --- WebSocket --- ### - -# Enable the WebSocket API service in broker -webSocketServiceEnabled=true - -# Number of IO threads in Pulsar Client used in WebSocket proxy -webSocketNumIoThreads=8 - -# Number of connections per Broker in Pulsar Client used in WebSocket proxy -webSocketConnectionsPerBroker=8 - -# Time in milliseconds that idle WebSocket session times out -webSocketSessionIdleTimeoutMillis=300000 - -# The maximum size of a text message during parsing in WebSocket proxy -webSocketMaxTextFrameSize=1048576 - -### --- Metrics --- ### - -# Enable topic level metrics -exposeTopicLevelMetricsInPrometheus=true - -# Classname of Pluggable JVM GC metrics logger that can log GC specific metrics -# jvmGCMetricsLoggerClassName= - -### --- Broker Web Stats --- ### - -# Enable topic level metrics -exposePublisherStats=true - -# Enable expose the precise backlog stats. -# Set false to use published counter and consumed counter to calculate, this would be more efficient but may be -# inaccurate. Default is false. -exposePreciseBacklogInPrometheus=false - -### --- Deprecated config variables --- ### - -# Deprecated. Use configurationStoreServers -globalZookeeperServers= - -# Deprecated. Use brokerDeleteInactiveTopicsFrequencySeconds -brokerServicePurgeInactiveFrequencyInSeconds=60 - -### --- BookKeeper Configuration --- ##### - -ledgerStorageClass=org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage - -# The maximum netty frame size in bytes. Any message received larger than this will be rejected. The default value is -# 5MB. -nettyMaxFrameSizeBytes=5253120 - -# Size of Write Cache. Memory is allocated from JVM direct memory. -# Write cache is used to buffer entries before flushing into the entry log -# For good performance, it should be big enough to hold a substantial amount -# of entries in the flush interval -# By default it will be allocated to 1/4th of the available direct memory -dbStorage_writeCacheMaxSizeMb= - -# Size of Read cache. Memory is allocated from JVM direct memory. -# This read cache is pre-filled doing read-ahead whenever a cache miss happens -# By default it will be allocated to 1/4th of the available direct memory -dbStorage_readAheadCacheMaxSizeMb= - -# How many entries to pre-fill in cache after a read cache miss -dbStorage_readAheadCacheBatchSize=1000 - -flushInterval=60000 - -## RocksDB specific configurations -## DbLedgerStorage uses RocksDB to store the indexes from -## (ledgerId, entryId) -> (entryLog, offset) - -# Size of RocksDB block-cache. For best performance, this cache -# should be big enough to hold a significant portion of the index -# database which can reach ~2GB in some cases -# Default is to use 10% of the direct memory size -dbStorage_rocksDB_blockCacheSize= - -# Other RocksDB specific tunables -dbStorage_rocksDB_writeBufferSizeMB=4 -dbStorage_rocksDB_sstSizeInMB=4 -dbStorage_rocksDB_blockSize=4096 -dbStorage_rocksDB_bloomFilterBitsPerKey=10 -dbStorage_rocksDB_numLevels=-1 -dbStorage_rocksDB_numFilesInLevel0=4 -dbStorage_rocksDB_maxSizeInLevel1MB=256 - -# Maximum latency to impose on a journal write to achieve grouping -journalMaxGroupWaitMSec=1 - -# Should the data be fsynced on journal before acknowledgment. -journalSyncData=false - - -# For each ledger dir, maximum disk space which can be used. -# Default is 0.95f. i.e. 95% of disk can be used at most after which nothing will -# be written to that partition. If all ledger dir partions are full, then bookie -# will turn to readonly mode if 'readOnlyModeEnabled=true' is set, else it will -# shutdown. -# Valid values should be in between 0 and 1 (exclusive). -diskUsageThreshold=0.99 - -# The disk free space low water mark threshold. -# Disk is considered full when usage threshold is exceeded. -# Disk returns back to non-full state when usage is below low water mark threshold. -# This prevents it from going back and forth between these states frequently -# when concurrent writes and compaction are happening. This also prevent bookie from -# switching frequently between read-only and read-writes states in the same cases. -diskUsageWarnThreshold=0.99 - -# Whether the bookie allowed to use a loopback interface as its primary -# interface(i.e. the interface it uses to establish its identity)? -# By default, loopback interfaces are not allowed as the primary -# interface. -# Using a loopback interface as the primary interface usually indicates -# a configuration error. For example, its fairly common in some VPS setups -# to not configure a hostname, or to have the hostname resolve to -# 127.0.0.1. If this is the case, then all bookies in the cluster will -# establish their identities as 127.0.0.1:3181, and only one will be able -# to join the cluster. For VPSs configured like this, you should explicitly -# set the listening interface. -allowLoopback=true - -# How long the interval to trigger next garbage collection, in milliseconds -# Since garbage collection is running in background, too frequent gc -# will heart performance. It is better to give a higher number of gc -# interval if there is enough disk capacity. -gcWaitTime=300000 - -# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false) -allowAutoTopicCreation=true - -# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) -allowAutoTopicCreationType=non-partitioned - -# Enable subscription auto creation if new consumer connected (disable auto creation with value false) -allowAutoSubscriptionCreation=true - -# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is -# partitioned. -defaultNumPartitions=1 - -### --- Transaction config variables --- ### -transactionMetadataStoreProviderClassName=\ - org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org