This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 476415f0c7 Fix flaky kafka consumer tests (#14458)
476415f0c7 is described below

commit 476415f0c740accad550f476832347712f452150
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Thu Nov 14 11:23:23 2024 -0800

    Fix flaky kafka consumer tests (#14458)
---
 .../pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java       | 3 +--
 .../pinot/plugin/stream/kafka30/utils/MiniKafkaCluster.java       | 4 +---
 .../apache/pinot/plugin/stream/kafka/utils/EmbeddedZooKeeper.java | 8 +++-----
 3 files changed, 5 insertions(+), 10 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
index 26c6b73f85..56be7c1f57 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
@@ -49,10 +49,9 @@ public final class MiniKafkaCluster implements Closeable {
   private final String _kafkaServerAddress;
   private final AdminClient _adminClient;
 
-  @SuppressWarnings({"rawtypes", "unchecked"})
   public MiniKafkaCluster(String brokerId)
       throws IOException, InterruptedException {
-    _zkServer = new EmbeddedZooKeeper();
+    _zkServer = new EmbeddedZooKeeper(new File(TEMP_DIR, "zk"));
     int kafkaServerPort = getAvailablePort();
     KafkaConfig kafkaBrokerConfig = new 
KafkaConfig(createBrokerConfig(brokerId, kafkaServerPort));
     _kafkaServer = new KafkaServer(kafkaBrokerConfig, Time.SYSTEM, 
Option.empty(), false);
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/utils/MiniKafkaCluster.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/utils/MiniKafkaCluster.java
index b107bd170c..dd5a6997e6 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/utils/MiniKafkaCluster.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/utils/MiniKafkaCluster.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.pinot.plugin.stream.kafka30.utils;
 
 import java.io.Closeable;
@@ -51,10 +50,9 @@ public final class MiniKafkaCluster implements Closeable {
   private final String _kafkaServerAddress;
   private final AdminClient _adminClient;
 
-  @SuppressWarnings({"rawtypes", "unchecked"})
   public MiniKafkaCluster(String brokerId)
       throws IOException, InterruptedException {
-    _zkServer = new EmbeddedZooKeeper();
+    _zkServer = new EmbeddedZooKeeper(new File(TEMP_DIR, "zk"));
     int kafkaServerPort = getAvailablePort();
     KafkaConfig kafkaBrokerConfig = new 
KafkaConfig(createBrokerConfig(brokerId, kafkaServerPort));
     _kafkaServer = new KafkaServer(kafkaBrokerConfig, Time.SYSTEM, 
Option.empty(), false);
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/utils/EmbeddedZooKeeper.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/utils/EmbeddedZooKeeper.java
index fccde3ef35..c633f6d06e 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/utils/EmbeddedZooKeeper.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/utils/EmbeddedZooKeeper.java
@@ -22,22 +22,21 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import org.apache.commons.io.FileUtils;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
 
 
 public class EmbeddedZooKeeper implements Closeable {
-  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"EmbeddedZooKeeper");
   private static final int TICK_TIME = 500;
 
   private final NIOServerCnxnFactory _factory;
   private final String _zkAddress;
 
-  public EmbeddedZooKeeper()
+  public EmbeddedZooKeeper(File workingDir)
       throws IOException, InterruptedException {
     _factory = new NIOServerCnxnFactory();
-    ZooKeeperServer zkServer = new ZooKeeperServer(new File(TEMP_DIR, "data"), 
new File(TEMP_DIR, "log"), TICK_TIME);
+    ZooKeeperServer zkServer =
+        new ZooKeeperServer(new File(workingDir, "data"), new File(workingDir, 
"log"), TICK_TIME);
     _factory.configure(new InetSocketAddress("localhost", 0), 0);
     _factory.startup(zkServer);
     _zkAddress = "localhost:" + zkServer.getClientPort();
@@ -51,6 +50,5 @@ public class EmbeddedZooKeeper implements Closeable {
   public void close()
       throws IOException {
     _factory.shutdown();
-    FileUtils.deleteDirectory(TEMP_DIR);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to