This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0c4fb58 Add the isolation level config to kafka ingestion to support Kafka transactions (#6580) 0c4fb58 is described below commit 0c4fb588d96a2c26241969bd1d00e2d5243882e3 Author: Yupeng Fu <yupe...@users.noreply.github.com> AuthorDate: Mon Feb 15 18:51:38 2021 -0800 Add the isolation level config to kafka ingestion to support Kafka transactions (#6580) Added the support of `isolation.level` in Kafka consumer (2.0) to ingest transactionally committed messages only (i.e. `read_committed`). --- .../tests/BaseClusterIntegrationTest.java | 11 +++- .../tests/ClusterIntegrationTestUtils.java | 60 ++++++++++++++++++++ ...tlyOnceKafkaRealtimeClusterIntegrationTest.java | 65 ++++++++++++++++++++++ .../KafkaPartitionLevelConnectionHandler.java | 3 + .../kafka20/KafkaPartitionLevelStreamConfig.java | 17 ++++++ .../kafka20/KafkaStreamLevelConsumerManager.java | 1 + .../KafkaPartitionLevelStreamConfigTest.java | 34 +++++++---- .../stream/kafka/KafkaStreamConfigProperties.java | 3 + .../pinot/tools/utils/KafkaStarterUtils.java | 8 +++ 9 files changed, 191 insertions(+), 11 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index 4e0ab3a..81f448b 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -125,6 +125,10 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { return false; } + protected boolean useKafkaTransaction() { + return false; + } + protected String getStreamConsumerFactoryClassName() { return KafkaStarterUtils.KAFKA_STREAM_CONSUMER_FACTORY_CLASS_NAME; } @@ -307,6 +311,11 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { streamConfigMap.put(KafkaStreamConfigProperties .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST), KafkaStarterUtils.DEFAULT_KAFKA_BROKER); + if (useKafkaTransaction()) { + streamConfigMap.put(KafkaStreamConfigProperties + .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL), + KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_COMMITTED); + } } else { // HLC streamConfigMap @@ -435,7 +444,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { } protected List<File> getAllAvroFiles() - throws Exception { + throws Exception { // Unpack the Avro files int numSegments = unpackAvroData(_tempDir).size(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java index 0deba16..1ecd3de 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java @@ -55,6 +55,9 @@ import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.util.Utf8; import org.apache.commons.lang3.math.NumberUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.pinot.client.Request; import org.apache.pinot.client.ResultSetGroup; import org.apache.pinot.common.request.BrokerRequest; @@ -346,6 +349,63 @@ public class ClusterIntegrationTestUtils { } /** + * Push the records from the given Avro files into a Kafka stream. + * + * @param avroFiles List of Avro files + * @param kafkaBroker Kafka broker config + * @param kafkaTopic Kafka topic + * @param maxNumKafkaMessagesPerBatch Maximum number of Kafka messages per batch + * @param header Optional Kafka message header + * @param partitionColumn Optional partition column + * @param commit if the transaction commits or aborts + * @throws Exception + */ + public static void pushAvroIntoKafkaWithTransaction(List<File> avroFiles, String kafkaBroker, String kafkaTopic, + int maxNumKafkaMessagesPerBatch, @Nullable byte[] header, @Nullable String partitionColumn, boolean commit) + throws Exception { + Properties props = new Properties(); + props.put("bootstrap.servers", kafkaBroker); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("request.required.acks", "1"); + props.put("transactional.id", "test-transaction"); + props.put("transaction.state.log.replication.factor", "2"); + + Producer<byte[], byte[]> producer = new KafkaProducer<>(props); + // initiate transaction. + producer.initTransactions(); + producer.beginTransaction(); + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) { + for (File avroFile : avroFiles) { + try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) { + BinaryEncoder binaryEncoder = new EncoderFactory().directBinaryEncoder(outputStream, null); + GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(reader.getSchema()); + for (GenericRecord genericRecord : reader) { + outputStream.reset(); + if (header != null && 0 < header.length) { + outputStream.write(header); + } + datumWriter.write(genericRecord, binaryEncoder); + binaryEncoder.flush(); + + byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(System.currentTimeMillis()) + : (genericRecord.get(partitionColumn)).toString().getBytes(); + byte[] bytes = outputStream.toByteArray(); + ProducerRecord<byte[], byte[]> record = new ProducerRecord(kafkaTopic, keyBytes, bytes); + producer.send(record); + } + } + } + } + if (commit) { + producer.commitTransaction(); + } else { + producer.abortTransaction(); + } + } + + + /** * Push random generated * * @param avroFile Sample Avro file used to extract the Avro schema diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java new file mode 100644 index 0000000..430452e --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.File; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.segment.ReadMode; +import org.apache.pinot.controller.ControllerConf; + + +public class ExactlyOnceKafkaRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest { + + @Override + protected boolean useLlc() { + return true; + } + + @Override + protected boolean useKafkaTransaction() { + return true; + } + + @Override + protected String getLoadMode() { + return ReadMode.mmap.name(); + } + + @Override + public void startController() { + Map<String, Object> properties = getDefaultControllerConfiguration(); + + properties.put(ControllerConf.ALLOW_HLC_TABLES, false); + startController(properties); + } + + @Override + protected void pushAvroIntoKafka(List<File> avroFiles) + throws Exception { + // the first transaction of kafka messages are aborted + ClusterIntegrationTestUtils + .pushAvroIntoKafkaWithTransaction(avroFiles, "localhost:" + getBaseKafkaPort(), getKafkaTopic(), + getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), false); + // the second transaction of kafka messages are committed + ClusterIntegrationTestUtils + .pushAvroIntoKafkaWithTransaction(avroFiles, "localhost:" + getBaseKafkaPort(), getKafkaTopic(), + getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), true); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java index f051ee1..c10bfd9 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java @@ -56,6 +56,9 @@ public abstract class KafkaPartitionLevelConnectionHandler { consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts()); consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName()); + if (_config.getKafkaIsolationLevel() != null) { + consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, _config.getKafkaIsolationLevel()); + } _consumer = new KafkaConsumer<>(consumerProp); _topicPartition = new TopicPartition(_topic, _partition); _consumer.assign(Collections.singletonList(_topicPartition)); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfig.java index 9be2b15..d4c54f6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfig.java @@ -37,6 +37,7 @@ public class KafkaPartitionLevelStreamConfig { private final int _kafkaSocketTimeout; private final int _kafkaFetcherSizeBytes; private final int _kafkaFetcherMinBytes; + private final String _kafkaIsolationLevel; private final Map<String, String> _streamConfigMap; /** @@ -58,6 +59,8 @@ public class KafkaPartitionLevelStreamConfig { .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES); String fetcherMinBytesKey = KafkaStreamConfigProperties .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES); + String isolationLevelKey = KafkaStreamConfigProperties + .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL); _bootstrapHosts = _streamConfigMap.get(llcBrokerListKey); _kafkaBufferSize = getIntConfigWithDefault(_streamConfigMap, llcBufferKey, KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT); @@ -66,6 +69,16 @@ public class KafkaPartitionLevelStreamConfig { _kafkaFetcherSizeBytes = getIntConfigWithDefault(_streamConfigMap, fetcherSizeKey, _kafkaBufferSize); _kafkaFetcherMinBytes = getIntConfigWithDefault(_streamConfigMap, fetcherMinBytesKey, KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT); + + _kafkaIsolationLevel = _streamConfigMap.get(isolationLevelKey); + if (_kafkaIsolationLevel != null) { + Preconditions.checkArgument( + _kafkaIsolationLevel.equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_COMMITTED) + || _kafkaIsolationLevel + .equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED), + String.format("Unrecognized Kafka isolation level: %s", _kafkaIsolationLevel)); + } + Preconditions.checkNotNull(_bootstrapHosts, "Must specify kafka brokers list " + llcBrokerListKey + " in case of low level kafka consumer"); } @@ -94,6 +107,10 @@ public class KafkaPartitionLevelStreamConfig { return _kafkaFetcherMinBytes; } + public String getKafkaIsolationLevel() { + return _kafkaIsolationLevel; + } + private int getIntConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) { String stringValue = configMap.get(key); try { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java index 3373633..2f99984 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java @@ -97,6 +97,7 @@ public class KafkaStreamLevelConsumerManager { consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaStreamLevelStreamConfig.getBootstrapServers()); consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName()); + if (consumerProp.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) && consumerProp .getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals("smallest")) { consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfigTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfigTest.java index e336965..b56cbb6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfigTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfigTest.java @@ -33,11 +33,16 @@ public class KafkaPartitionLevelStreamConfigTest { private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer, String socketTimeout) { - return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null, null); + return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null, null, null); } private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer, - String socketTimeout, String fetcherSize, String fetcherMinBytes) { + String socketTimeout, String isolationLevel) { + return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null, null, isolationLevel); + } + + private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer, + String socketTimeout, String fetcherSize, String fetcherMinBytes, String isolationLevel) { Map<String, String> streamConfigMap = new HashMap<>(); String streamType = "kafka"; String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString(); @@ -69,10 +74,19 @@ public class KafkaPartitionLevelStreamConfigTest { if (fetcherMinBytes != null) { streamConfigMap.put("stream.kafka.fetcher.minBytes", fetcherMinBytes); } + if (isolationLevel != null) { + streamConfigMap.put("stream.kafka.isolation.level", isolationLevel); + } return new KafkaPartitionLevelStreamConfig(new StreamConfig(tableNameWithType, streamConfigMap)); } @Test + public void testGetKafkaIsolationLevel() { + KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "", "", "", "read_committed"); + Assert.assertEquals("read_committed", config.getKafkaIsolationLevel()); + } + + @Test public void testGetKafkaTopicName() { KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "", "", ""); Assert.assertEquals("topic", config.getKafkaTopicName()); @@ -127,38 +141,38 @@ public class KafkaPartitionLevelStreamConfigTest { @Test public void testGetFetcherSize() { // test default - KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "", "", null); + KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "", "", null, null); Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, config.getKafkaFetcherSizeBytes()); - config = getStreamConfig("topic", "host1", "100", "", "", null); + config = getStreamConfig("topic", "host1", "100", "", "", null, null); Assert.assertEquals(100, config.getKafkaFetcherSizeBytes()); - config = getStreamConfig("topic", "host1", "100", "", "bad value", null); + config = getStreamConfig("topic", "host1", "100", "", "bad value", null, null); Assert.assertEquals(100, config.getKafkaFetcherSizeBytes()); // correct config - config = getStreamConfig("topic", "host1", "100", "", "200", null); + config = getStreamConfig("topic", "host1", "100", "", "200", null, null); Assert.assertEquals(200, config.getKafkaFetcherSizeBytes()); } @Test public void testGetFetcherMinBytes() { // test default - KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "", "", null); + KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "", "", null, null); Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT, config.getKafkaFetcherMinBytes()); - config = getStreamConfig("topic", "host1", "", "", "", ""); + config = getStreamConfig("topic", "host1", "", "", "", "", null); Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT, config.getKafkaFetcherMinBytes()); - config = getStreamConfig("topic", "host1", "", "", "", "bad value"); + config = getStreamConfig("topic", "host1", "", "", "", "bad value", null); Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT, config.getKafkaFetcherMinBytes()); // correct config - config = getStreamConfig("topic", "host1", "", "", "", "100"); + config = getStreamConfig("topic", "host1", "", "", "", "100", null); Assert.assertEquals(100, config.getKafkaFetcherMinBytes()); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java index 042b30c..45637c3 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java @@ -59,6 +59,9 @@ public class KafkaStreamConfigProperties { public static final String KAFKA_FETCHER_SIZE_BYTES = "kafka.fetcher.size"; public static final String KAFKA_FETCHER_MIN_BYTES = "kafka.fetcher.minBytes"; public static final int KAFKA_FETCHER_MIN_BYTES_DEFAULT = 100000; + public static final String KAFKA_ISOLATION_LEVEL = "kafka.isolation.level"; + public static final String KAFKA_ISOLATION_LEVEL_READ_COMMITTED = "read_committed"; + public static final String KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED = "read_uncommitted"; } public static final String KAFKA_CONSUMER_PROP_PREFIX = "kafka.consumer.prop"; diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KafkaStarterUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KafkaStarterUtils.java index 0f7eb4e..52303fc 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KafkaStarterUtils.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KafkaStarterUtils.java @@ -60,6 +60,10 @@ public class KafkaStarterUtils { // Enable topic deletion by default for integration tests configureTopicDeletion(configuration, true); + // set the transaction state replication factor + configureTransactionStateLogReplicationFactor(configuration, (short) 1); + configuration.put("transaction.state.log.min.isr", 1); + // Set host name configureHostName(configuration, "localhost"); configureOffsetsTopicReplicationFactor(configuration, (short) 1); @@ -75,6 +79,10 @@ public class KafkaStarterUtils { configuration.put("offsets.topic.replication.factor", replicationFactor); } + public static void configureTransactionStateLogReplicationFactor(Properties configuration, short replicationFactor) { + configuration.put("transaction.state.log.replication.factor", replicationFactor); + } + public static void configureTopicDeletion(Properties configuration, boolean topicDeletionEnabled) { configuration.put("delete.topic.enable", Boolean.toString(topicDeletionEnabled)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org