This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c4fb58  Add the isolation level config to kafka ingestion to support 
Kafka transactions (#6580)
0c4fb58 is described below

commit 0c4fb588d96a2c26241969bd1d00e2d5243882e3
Author: Yupeng Fu <yupe...@users.noreply.github.com>
AuthorDate: Mon Feb 15 18:51:38 2021 -0800

    Add the isolation level config to kafka ingestion to support Kafka 
transactions (#6580)
    
    Added the support of `isolation.level` in Kafka consumer (2.0) to ingest 
transactionally committed messages only (i.e. `read_committed`).
---
 .../tests/BaseClusterIntegrationTest.java          | 11 +++-
 .../tests/ClusterIntegrationTestUtils.java         | 60 ++++++++++++++++++++
 ...tlyOnceKafkaRealtimeClusterIntegrationTest.java | 65 ++++++++++++++++++++++
 .../KafkaPartitionLevelConnectionHandler.java      |  3 +
 .../kafka20/KafkaPartitionLevelStreamConfig.java   | 17 ++++++
 .../kafka20/KafkaStreamLevelConsumerManager.java   |  1 +
 .../KafkaPartitionLevelStreamConfigTest.java       | 34 +++++++----
 .../stream/kafka/KafkaStreamConfigProperties.java  |  3 +
 .../pinot/tools/utils/KafkaStarterUtils.java       |  8 +++
 9 files changed, 191 insertions(+), 11 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 4e0ab3a..81f448b 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -125,6 +125,10 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
     return false;
   }
 
+  protected boolean useKafkaTransaction() {
+    return false;
+  }
+
   protected String getStreamConsumerFactoryClassName() {
     return KafkaStarterUtils.KAFKA_STREAM_CONSUMER_FACTORY_CLASS_NAME;
   }
@@ -307,6 +311,11 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
       streamConfigMap.put(KafkaStreamConfigProperties
               
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST),
           KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+      if (useKafkaTransaction()) {
+        streamConfigMap.put(KafkaStreamConfigProperties
+                
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL),
+            
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_COMMITTED);
+      }
     } else {
       // HLC
       streamConfigMap
@@ -435,7 +444,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   }
 
   protected List<File> getAllAvroFiles()
-          throws Exception {
+      throws Exception {
     // Unpack the Avro files
     int numSegments = unpackAvroData(_tempDir).size();
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 0deba16..1ecd3de 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -55,6 +55,9 @@ import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.util.Utf8;
 import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.pinot.client.Request;
 import org.apache.pinot.client.ResultSetGroup;
 import org.apache.pinot.common.request.BrokerRequest;
@@ -346,6 +349,63 @@ public class ClusterIntegrationTestUtils {
   }
 
   /**
+   * Push the records from the given Avro files into a Kafka stream.
+   *
+   * @param avroFiles List of Avro files
+   * @param kafkaBroker Kafka broker config
+   * @param kafkaTopic Kafka topic
+   * @param maxNumKafkaMessagesPerBatch Maximum number of Kafka messages per 
batch
+   * @param header Optional Kafka message header
+   * @param partitionColumn Optional partition column
+   * @param commit if the transaction commits or aborts
+   * @throws Exception
+   */
+  public static void pushAvroIntoKafkaWithTransaction(List<File> avroFiles, 
String kafkaBroker, String kafkaTopic,
+      int maxNumKafkaMessagesPerBatch, @Nullable byte[] header, @Nullable 
String partitionColumn, boolean commit)
+      throws Exception {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", kafkaBroker);
+    props.put("key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+    props.put("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+    props.put("request.required.acks", "1");
+    props.put("transactional.id", "test-transaction");
+    props.put("transaction.state.log.replication.factor", "2");
+
+    Producer<byte[], byte[]> producer = new KafkaProducer<>(props);
+    // initiate transaction.
+    producer.initTransactions();
+    producer.beginTransaction();
+    try (ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(65536)) {
+      for (File avroFile : avroFiles) {
+        try (DataFileStream<GenericRecord> reader = 
AvroUtils.getAvroReader(avroFile)) {
+          BinaryEncoder binaryEncoder = new 
EncoderFactory().directBinaryEncoder(outputStream, null);
+          GenericDatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(reader.getSchema());
+          for (GenericRecord genericRecord : reader) {
+            outputStream.reset();
+            if (header != null && 0 < header.length) {
+              outputStream.write(header);
+            }
+            datumWriter.write(genericRecord, binaryEncoder);
+            binaryEncoder.flush();
+
+            byte[] keyBytes = (partitionColumn == null) ? 
Longs.toByteArray(System.currentTimeMillis())
+                : (genericRecord.get(partitionColumn)).toString().getBytes();
+            byte[] bytes = outputStream.toByteArray();
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord(kafkaTopic, keyBytes, bytes);
+            producer.send(record);
+          }
+        }
+      }
+    }
+    if (commit) {
+      producer.commitTransaction();
+    } else {
+      producer.abortTransaction();
+    }
+  }
+
+
+  /**
    * Push random generated
    *
    * @param avroFile Sample Avro file used to extract the Avro schema
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
new file mode 100644
index 0000000..430452e
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.controller.ControllerConf;
+
+
+public class ExactlyOnceKafkaRealtimeClusterIntegrationTest extends 
RealtimeClusterIntegrationTest {
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Override
+  protected boolean useKafkaTransaction() {
+    return true;
+  }
+
+  @Override
+  protected String getLoadMode() {
+    return ReadMode.mmap.name();
+  }
+
+  @Override
+  public void startController() {
+    Map<String, Object> properties = getDefaultControllerConfiguration();
+
+    properties.put(ControllerConf.ALLOW_HLC_TABLES, false);
+    startController(properties);
+  }
+
+  @Override
+  protected void pushAvroIntoKafka(List<File> avroFiles)
+      throws Exception {
+    // the first transaction of kafka messages are aborted
+    ClusterIntegrationTestUtils
+        .pushAvroIntoKafkaWithTransaction(avroFiles, "localhost:" + 
getBaseKafkaPort(), getKafkaTopic(),
+            getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), 
getPartitionColumn(), false);
+    // the second transaction of kafka messages are committed
+    ClusterIntegrationTestUtils
+        .pushAvroIntoKafkaWithTransaction(avroFiles, "localhost:" + 
getBaseKafkaPort(), getKafkaTopic(),
+            getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), 
getPartitionColumn(), true);
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
index f051ee1..c10bfd9 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
@@ -56,6 +56,9 @@ public abstract class KafkaPartitionLevelConnectionHandler {
     consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
_config.getBootstrapHosts());
     consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
     consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
BytesDeserializer.class.getName());
+    if (_config.getKafkaIsolationLevel() != null) {
+      consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
_config.getKafkaIsolationLevel());
+    }
     _consumer = new KafkaConsumer<>(consumerProp);
     _topicPartition = new TopicPartition(_topic, _partition);
     _consumer.assign(Collections.singletonList(_topicPartition));
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfig.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfig.java
index 9be2b15..d4c54f6 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfig.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfig.java
@@ -37,6 +37,7 @@ public class KafkaPartitionLevelStreamConfig {
   private final int _kafkaSocketTimeout;
   private final int _kafkaFetcherSizeBytes;
   private final int _kafkaFetcherMinBytes;
+  private final String _kafkaIsolationLevel;
   private final Map<String, String> _streamConfigMap;
 
   /**
@@ -58,6 +59,8 @@ public class KafkaPartitionLevelStreamConfig {
         
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES);
     String fetcherMinBytesKey = KafkaStreamConfigProperties
         
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES);
+    String isolationLevelKey = KafkaStreamConfigProperties
+        
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL);
     _bootstrapHosts = _streamConfigMap.get(llcBrokerListKey);
     _kafkaBufferSize = getIntConfigWithDefault(_streamConfigMap, llcBufferKey,
         
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
@@ -66,6 +69,16 @@ public class KafkaPartitionLevelStreamConfig {
     _kafkaFetcherSizeBytes = getIntConfigWithDefault(_streamConfigMap, 
fetcherSizeKey, _kafkaBufferSize);
     _kafkaFetcherMinBytes = getIntConfigWithDefault(_streamConfigMap, 
fetcherMinBytesKey,
         
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT);
+
+    _kafkaIsolationLevel = _streamConfigMap.get(isolationLevelKey);
+    if (_kafkaIsolationLevel != null) {
+      Preconditions.checkArgument(
+          
_kafkaIsolationLevel.equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_COMMITTED)
+              || _kafkaIsolationLevel
+              
.equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED),
+          String.format("Unrecognized Kafka isolation level: %s", 
_kafkaIsolationLevel));
+    }
+
     Preconditions.checkNotNull(_bootstrapHosts,
         "Must specify kafka brokers list " + llcBrokerListKey + " in case of 
low level kafka consumer");
   }
@@ -94,6 +107,10 @@ public class KafkaPartitionLevelStreamConfig {
     return _kafkaFetcherMinBytes;
   }
 
+  public String getKafkaIsolationLevel() {
+    return _kafkaIsolationLevel;
+  }
+
   private int getIntConfigWithDefault(Map<String, String> configMap, String 
key, int defaultValue) {
     String stringValue = configMap.get(key);
     try {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java
index 3373633..2f99984 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java
@@ -97,6 +97,7 @@ public class KafkaStreamLevelConsumerManager {
       consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaStreamLevelStreamConfig.getBootstrapServers());
       consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
       consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
BytesDeserializer.class.getName());
+
       if (consumerProp.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) && 
consumerProp
           
.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals("smallest")) {
         consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfigTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfigTest.java
index e336965..b56cbb6 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfigTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelStreamConfigTest.java
@@ -33,11 +33,16 @@ public class KafkaPartitionLevelStreamConfigTest {
 
   private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String 
bootstrapHosts, String buffer,
       String socketTimeout) {
-    return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null, 
null);
+    return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null, 
null, null);
   }
 
   private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String 
bootstrapHosts, String buffer,
-      String socketTimeout, String fetcherSize, String fetcherMinBytes) {
+      String socketTimeout, String isolationLevel) {
+    return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null, 
null, isolationLevel);
+  }
+
+  private KafkaPartitionLevelStreamConfig getStreamConfig(String topic, String 
bootstrapHosts, String buffer,
+      String socketTimeout, String fetcherSize, String fetcherMinBytes, String 
isolationLevel) {
     Map<String, String> streamConfigMap = new HashMap<>();
     String streamType = "kafka";
     String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString();
@@ -69,10 +74,19 @@ public class KafkaPartitionLevelStreamConfigTest {
     if (fetcherMinBytes != null) {
       streamConfigMap.put("stream.kafka.fetcher.minBytes", fetcherMinBytes);
     }
+    if (isolationLevel != null) {
+      streamConfigMap.put("stream.kafka.isolation.level", isolationLevel);
+    }
     return new KafkaPartitionLevelStreamConfig(new 
StreamConfig(tableNameWithType, streamConfigMap));
   }
 
   @Test
+  public void testGetKafkaIsolationLevel() {
+    KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "", "", 
"", "read_committed");
+    Assert.assertEquals("read_committed", config.getKafkaIsolationLevel());
+  }
+
+  @Test
   public void testGetKafkaTopicName() {
     KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "", "", 
"");
     Assert.assertEquals("topic", config.getKafkaTopicName());
@@ -127,38 +141,38 @@ public class KafkaPartitionLevelStreamConfigTest {
   @Test
   public void testGetFetcherSize() {
     // test default
-    KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", 
"", "", "", null);
+    KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", 
"", "", "", null, null);
     
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
         config.getKafkaFetcherSizeBytes());
 
-    config = getStreamConfig("topic", "host1", "100", "", "", null);
+    config = getStreamConfig("topic", "host1", "100", "", "", null, null);
     Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
 
-    config = getStreamConfig("topic", "host1", "100", "", "bad value", null);
+    config = getStreamConfig("topic", "host1", "100", "", "bad value", null, 
null);
     Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
 
     // correct config
-    config = getStreamConfig("topic", "host1", "100", "", "200", null);
+    config = getStreamConfig("topic", "host1", "100", "", "200", null, null);
     Assert.assertEquals(200, config.getKafkaFetcherSizeBytes());
   }
 
   @Test
   public void testGetFetcherMinBytes() {
     // test default
-    KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", 
"", "", "", null);
+    KafkaPartitionLevelStreamConfig config = getStreamConfig("topic", "host1", 
"", "", "", null, null);
     
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
         config.getKafkaFetcherMinBytes());
 
-    config = getStreamConfig("topic", "host1", "", "", "", "");
+    config = getStreamConfig("topic", "host1", "", "", "", "", null);
     
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
         config.getKafkaFetcherMinBytes());
 
-    config = getStreamConfig("topic", "host1", "", "", "", "bad value");
+    config = getStreamConfig("topic", "host1", "", "", "", "bad value", null);
     
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
         config.getKafkaFetcherMinBytes());
 
     // correct config
-    config = getStreamConfig("topic", "host1", "", "", "", "100");
+    config = getStreamConfig("topic", "host1", "", "", "", "100", null);
     Assert.assertEquals(100, config.getKafkaFetcherMinBytes());
   }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java
index 042b30c..45637c3 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java
@@ -59,6 +59,9 @@ public class KafkaStreamConfigProperties {
     public static final String KAFKA_FETCHER_SIZE_BYTES = "kafka.fetcher.size";
     public static final String KAFKA_FETCHER_MIN_BYTES = 
"kafka.fetcher.minBytes";
     public static final int KAFKA_FETCHER_MIN_BYTES_DEFAULT = 100000;
+    public static final String KAFKA_ISOLATION_LEVEL = "kafka.isolation.level";
+    public static final String KAFKA_ISOLATION_LEVEL_READ_COMMITTED = 
"read_committed";
+    public static final String KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED = 
"read_uncommitted";
   }
 
   public static final String KAFKA_CONSUMER_PROP_PREFIX = 
"kafka.consumer.prop";
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KafkaStarterUtils.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KafkaStarterUtils.java
index 0f7eb4e..52303fc 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KafkaStarterUtils.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KafkaStarterUtils.java
@@ -60,6 +60,10 @@ public class KafkaStarterUtils {
     // Enable topic deletion by default for integration tests
     configureTopicDeletion(configuration, true);
 
+    // set the transaction state replication factor
+    configureTransactionStateLogReplicationFactor(configuration, (short) 1);
+    configuration.put("transaction.state.log.min.isr", 1);
+
     // Set host name
     configureHostName(configuration, "localhost");
     configureOffsetsTopicReplicationFactor(configuration, (short) 1);
@@ -75,6 +79,10 @@ public class KafkaStarterUtils {
     configuration.put("offsets.topic.replication.factor", replicationFactor);
   }
 
+  public static void configureTransactionStateLogReplicationFactor(Properties 
configuration, short replicationFactor) {
+    configuration.put("transaction.state.log.replication.factor", 
replicationFactor);
+  }
+
   public static void configureTopicDeletion(Properties configuration, boolean 
topicDeletionEnabled) {
     configuration.put("delete.topic.enable", 
Boolean.toString(topicDeletionEnabled));
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to