This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 476415f0c7 Fix flaky kafka consumer tests (#14458) 476415f0c7 is described below commit 476415f0c740accad550f476832347712f452150 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Nov 14 11:23:23 2024 -0800 Fix flaky kafka consumer tests (#14458) --- .../pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java | 3 +-- .../pinot/plugin/stream/kafka30/utils/MiniKafkaCluster.java | 4 +--- .../apache/pinot/plugin/stream/kafka/utils/EmbeddedZooKeeper.java | 8 +++----- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java index 26c6b73f85..56be7c1f57 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java @@ -49,10 +49,9 @@ public final class MiniKafkaCluster implements Closeable { private final String _kafkaServerAddress; private final AdminClient _adminClient; - @SuppressWarnings({"rawtypes", "unchecked"}) public MiniKafkaCluster(String brokerId) throws IOException, InterruptedException { - _zkServer = new EmbeddedZooKeeper(); + _zkServer = new EmbeddedZooKeeper(new File(TEMP_DIR, "zk")); int kafkaServerPort = getAvailablePort(); KafkaConfig kafkaBrokerConfig = new KafkaConfig(createBrokerConfig(brokerId, kafkaServerPort)); _kafkaServer = new KafkaServer(kafkaBrokerConfig, Time.SYSTEM, Option.empty(), false); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/utils/MiniKafkaCluster.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/utils/MiniKafkaCluster.java index b107bd170c..dd5a6997e6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/utils/MiniKafkaCluster.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/utils/MiniKafkaCluster.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.pinot.plugin.stream.kafka30.utils; import java.io.Closeable; @@ -51,10 +50,9 @@ public final class MiniKafkaCluster implements Closeable { private final String _kafkaServerAddress; private final AdminClient _adminClient; - @SuppressWarnings({"rawtypes", "unchecked"}) public MiniKafkaCluster(String brokerId) throws IOException, InterruptedException { - _zkServer = new EmbeddedZooKeeper(); + _zkServer = new EmbeddedZooKeeper(new File(TEMP_DIR, "zk")); int kafkaServerPort = getAvailablePort(); KafkaConfig kafkaBrokerConfig = new KafkaConfig(createBrokerConfig(brokerId, kafkaServerPort)); _kafkaServer = new KafkaServer(kafkaBrokerConfig, Time.SYSTEM, Option.empty(), false); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/utils/EmbeddedZooKeeper.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/utils/EmbeddedZooKeeper.java index fccde3ef35..c633f6d06e 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/utils/EmbeddedZooKeeper.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/utils/EmbeddedZooKeeper.java @@ -22,22 +22,21 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; -import org.apache.commons.io.FileUtils; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; public class EmbeddedZooKeeper implements Closeable { - private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "EmbeddedZooKeeper"); private static final int TICK_TIME = 500; private final NIOServerCnxnFactory _factory; private final String _zkAddress; - public EmbeddedZooKeeper() + public EmbeddedZooKeeper(File workingDir) throws IOException, InterruptedException { _factory = new NIOServerCnxnFactory(); - ZooKeeperServer zkServer = new ZooKeeperServer(new File(TEMP_DIR, "data"), new File(TEMP_DIR, "log"), TICK_TIME); + ZooKeeperServer zkServer = + new ZooKeeperServer(new File(workingDir, "data"), new File(workingDir, "log"), TICK_TIME); _factory.configure(new InetSocketAddress("localhost", 0), 0); _factory.startup(zkServer); _zkAddress = "localhost:" + zkServer.getClientPort(); @@ -51,6 +50,5 @@ public class EmbeddedZooKeeper implements Closeable { public void close() throws IOException { _factory.shutdown(); - FileUtils.deleteDirectory(TEMP_DIR); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org