This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d41ec0bfd4 Refactor stream quickstart (#9227)
d41ec0bfd4 is described below

commit d41ec0bfd4282ff2121ecb9b1b36af48c1890c1e
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Wed Aug 17 11:24:34 2022 -0700

    Refactor stream quickstart (#9227)
---
 .../kafka20/server/KafkaDataServerStartable.java   |  31 ++++
 .../org/apache/pinot/tools/GenericQuickstart.java  |  29 ----
 .../apache/pinot/tools/GitHubEventsQuickstart.java |  28 ++--
 .../org/apache/pinot/tools/HybridQuickstart.java   | 165 +++++++--------------
 .../pinot/tools/PartialUpsertQuickStart.java       |  18 +--
 .../org/apache/pinot/tools/QuickStartBase.java     | 164 ++++++++++++++++++++
 .../apache/pinot/tools/QuickstartTableRequest.java |  21 +++
 .../RealtimeComplexTypeHandlingQuickStart.java     |  83 +----------
 .../pinot/tools/RealtimeJsonIndexQuickStart.java   |  92 ++----------
 .../org/apache/pinot/tools/RealtimeQuickStart.java | 102 +++++--------
 .../pinot/tools/RealtimeQuickStartWithMinion.java  | 126 ++--------------
 .../apache/pinot/tools/UpsertJsonQuickStart.java   |  19 +--
 .../org/apache/pinot/tools/UpsertQuickStart.java   |  19 +--
 .../pinot/tools/streams/AirlineDataStream.java     |  13 +-
 .../pinot/tools/streams/MeetupRsvpStream.java      |  18 ++-
 .../pinot/tools/streams/RsvpSourceGenerator.java   |  63 +++++++-
 .../airlineStats_realtime_table_config.json        |  26 ++++
 .../stream/airlineStats/airlineStats_schema.json   |  12 ++
 .../airlineStats_data.avro                         | Bin
 .../airlineStats_data.json                         |   0
 .../{sample_data => rawdata}/airlineStats_data.orc | Bin
 .../meetupRsvp_realtime_table_config.json          |  18 ++-
 .../meetupRsvp_realtime_table_config.json          |  18 ++-
 ...etupRsvpComplexType_realtime_table_config.json} |  41 ++---
 .../meetupRsvpComplexType_schema.json              |  63 ++++++++
 .../meetupRsvpJson_realtime_table_config.json}     |   4 +-
 .../meetupRsvpJson_schema.json}                    |   2 +-
 27 files changed, 575 insertions(+), 600 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
index 42bbd9be8d..9bd13c43ae 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.stream.kafka20.server;
 
+import com.google.common.base.Function;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
@@ -25,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
+import javax.annotation.Nullable;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import org.I0Itec.zkclient.ZkClient;
@@ -96,10 +98,39 @@ public class KafkaDataServerStartable implements 
StreamDataServerStartable {
     int partition = (Integer) props.get("partition");
     Collection<NewTopic> topicList = Arrays.asList(new NewTopic(topic, 
partition, (short) 1));
     _adminClient.createTopics(topicList);
+    waitForCondition(new Function<Void, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable Void aVoid) {
+        try {
+          return _adminClient.listTopics().names().get().contains(topic);
+        } catch (Exception e) {
+          LOGGER.warn("Could not fetch Kafka topics", e);
+          return null;
+        }
+      }
+    }, 1000L, 30000, "Kafka topic " + topic + " is not created yet");
   }
 
   @Override
   public int getPort() {
     return _port;
   }
+
+  private static void waitForCondition(Function<Void, Boolean> condition, long 
checkIntervalMs, long timeoutMs,
+      @Nullable String errorMessage) {
+    long endTime = System.currentTimeMillis() + timeoutMs;
+    String errorMessageSuffix = errorMessage != null ? ", error message: " + 
errorMessage : "";
+    while (System.currentTimeMillis() < endTime) {
+      try {
+        if (Boolean.TRUE.equals(condition.apply(null))) {
+          return;
+        }
+        Thread.sleep(checkIntervalMs);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while checking the condition" + 
errorMessageSuffix, e);
+      }
+    }
+    LOGGER.error("Failed to meet condition in " + timeoutMs + "ms" + 
errorMessageSuffix);
+  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java
index 62fc47d6b3..b4b8f295cd 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java
@@ -20,11 +20,7 @@ package org.apache.pinot.tools;
 
 import java.util.Arrays;
 import java.util.List;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,24 +39,10 @@ import org.slf4j.LoggerFactory;
  */
 public class GenericQuickstart extends Quickstart {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GenericQuickstart.class);
-  private StreamDataServerStartable _kafkaStarter;
-  private ZkStarter.ZookeeperInstance _zookeeperInstance;
 
   public GenericQuickstart() {
   }
 
-  private void startKafka() {
-    _zookeeperInstance = ZkStarter.startLocalZkServer();
-    try {
-      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
-          KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance));
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
-    }
-    _kafkaStarter.start();
-    _kafkaStarter.createTopic("pullRequestMergedEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
-  }
-
   @Override
   public List<String> types() {
     return Arrays.asList("GENERIC");
@@ -92,18 +74,7 @@ public class GenericQuickstart extends Quickstart {
 
   public void execute()
       throws Exception {
-    printStatus(Color.CYAN, "***** Starting Kafka *****");
     startKafka();
-
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-      try {
-        printStatus(Color.GREEN, "***** Shutting down QuickStart cluster 
*****");
-        _kafkaStarter.stop();
-        ZkStarter.stopLocalZkServer(_zookeeperInstance);
-      } catch (Exception e) {
-        LOGGER.error("Caught exception in shutting down QuickStart cluster", 
e);
-      }
-    }));
     super.execute();
   }
 
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
index 47a1a9417f..8765333b96 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
@@ -26,20 +26,16 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import 
org.apache.pinot.tools.streams.githubevents.PullRequestMergedEventsStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.apache.pinot.tools.utils.KinesisStarterUtils;
 import org.apache.pinot.tools.utils.StreamSourceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
-
 
 /**
  * Sets up a demo Pinot cluster with 1 zookeeper, 1 controller, 1 broker and 1 
server
@@ -50,25 +46,12 @@ import static 
org.apache.pinot.tools.Quickstart.prettyPrintResponse;
 public class GitHubEventsQuickstart extends QuickStartBase {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GitHubEventsQuickstart.class);
   private StreamDataServerStartable _serverStarter;
-  private ZkStarter.ZookeeperInstance _zookeeperInstance;
   private String _personalAccessToken;
   private StreamSourceType _sourceType;
 
   public GitHubEventsQuickstart() {
   }
 
-  private void startKafka() {
-    _zookeeperInstance = ZkStarter.startLocalZkServer();
-    try {
-      _serverStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
-          KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance));
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
-    }
-    _serverStarter.start();
-    _serverStarter.createTopic("pullRequestMergedEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
-  }
-
   private void startKinesis() {
     try {
 
@@ -85,6 +68,15 @@ public class GitHubEventsQuickstart extends QuickStartBase {
     Properties topicProperties = new Properties();
     topicProperties.put(KinesisStarterUtils.NUM_SHARDS, 3);
     _serverStarter.createTopic("pullRequestMergedEvents", topicProperties);
+
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        printStatus(Color.GREEN, "***** Shutting down Kinesis *****");
+        _serverStarter.stop();
+      } catch (Exception e) {
+        LOGGER.error("Caught exception in shutting down Kinesis", e);
+      }
+    }));
   }
 
   private void startStreamServer() {
@@ -148,8 +140,6 @@ public class GitHubEventsQuickstart extends QuickStartBase {
       try {
         printStatus(Color.GREEN, "***** Shutting down GitHubEventsQuickStart 
*****");
         runner.stop();
-        _serverStarter.stop();
-        ZkStarter.stopLocalZkServer(_zookeeperInstance);
         FileUtils.deleteDirectory(quickStartDataDir);
       } catch (Exception e) {
         LOGGER.error("Caught exception in shutting down GitHubEvents 
QuickStart", e);
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index 3ef0945a75..c65fa0a347 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -19,45 +19,27 @@
 package org.apache.pinot.tools;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableMap;
 import java.io.File;
-import java.io.IOException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.PinotAdministrator;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.AirlineDataStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
 
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
 
-
-public class HybridQuickstart extends QuickStartBase {
+public class HybridQuickstart extends Quickstart {
   @Override
   public List<String> types() {
     return Collections.singletonList("HYBRID");
   }
 
-  private StreamDataServerStartable _kafkaStarter;
-  private ZkStarter.ZookeeperInstance _zookeeperInstance;
-  private File _schemaFile;
-  private File _realtimeTableConfigFile;
-  private File _dataFile;
-  private File _ingestionJobSpecFile;
-
   public static void main(String[] args)
       throws Exception {
     List<String> arguments = new ArrayList<>();
@@ -73,97 +55,9 @@ public class HybridQuickstart extends QuickStartBase {
     return overrides;
   }
 
-  private QuickstartTableRequest prepareTableRequest(File baseDir)
-      throws IOException {
-    _schemaFile = new File(baseDir, "airlineStats_schema.json");
-    _ingestionJobSpecFile = new File(baseDir, "ingestionJobSpec.yaml");
-    File tableConfigFile = new File(baseDir, 
"airlineStats_offline_table_config.json");
-
-    ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = 
classLoader.getResource("examples/batch/airlineStats/airlineStats_schema.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, _schemaFile);
-    resource = 
classLoader.getResource("examples/batch/airlineStats/ingestionJobSpec.yaml");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, _ingestionJobSpecFile);
-    resource = 
classLoader.getResource("examples/batch/airlineStats/airlineStats_offline_table_config.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, tableConfigFile);
-
-    _realtimeTableConfigFile = new File(baseDir, 
"airlineStats_realtime_table_config.json");
-    resource = Quickstart.class.getClassLoader()
-        
.getResource("examples/stream/airlineStats/airlineStats_realtime_table_config.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, _realtimeTableConfigFile);
-    resource = Quickstart.class.getClassLoader()
-        
.getResource("examples/stream/airlineStats/sample_data/airlineStats_data.avro");
-    Preconditions.checkNotNull(resource);
-    _dataFile = new File(baseDir, "airlineStats_data.avro");
-    FileUtils.copyURLToFile(resource, _dataFile);
-
-    return new QuickstartTableRequest(baseDir.getAbsolutePath());
-  }
-
-  private void startKafka() {
-    _zookeeperInstance = ZkStarter.startLocalZkServer();
-    try {
-      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
-          KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance));
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
-    }
-    _kafkaStarter.start();
-    _kafkaStarter.createTopic("flights-realtime", 
KafkaStarterUtils.getTopicCreationProps(10));
-  }
-
-  public void execute()
+  @Override
+  public void runSampleQueries(QuickstartRunner runner)
       throws Exception {
-    File quickstartTmpDir = new File(_dataDir, 
String.valueOf(System.currentTimeMillis()));
-    File baseDir = new File(quickstartTmpDir, "airlineStats");
-    File dataDir = new File(baseDir, "data");
-    Preconditions.checkState(dataDir.mkdirs());
-    QuickstartTableRequest bootstrapTableRequest = 
prepareTableRequest(baseDir);
-    final QuickstartRunner runner = new 
QuickstartRunner(Lists.newArrayList(bootstrapTableRequest),
-        1, 1, 1, 1, dataDir, getConfigOverrides());
-    printStatus(Color.YELLOW, "***** Starting Kafka  *****");
-    startKafka();
-    printStatus(Color.YELLOW, "***** Starting airline data stream and 
publishing to Kafka *****");
-    Schema schema = Schema.fromFile(_schemaFile);
-    TableConfig tableConfig = JsonUtils.fileToObject(_realtimeTableConfigFile, 
TableConfig.class);
-    AirlineDataStream stream = new AirlineDataStream(schema, tableConfig, 
_dataFile);
-    stream.run();
-    printStatus(Color.YELLOW, "***** Starting Zookeeper, 1 servers, 1 brokers 
and 1 controller *****");
-    runner.startAll();
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-      try {
-        printStatus(Color.GREEN, "***** Shutting down hybrid quick start 
*****");
-        runner.stop();
-        stream.shutdown();
-        _kafkaStarter.stop();
-        ZkStarter.stopLocalZkServer(_zookeeperInstance);
-        FileUtils.deleteDirectory(quickstartTmpDir);
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }));
-    printStatus(Color.YELLOW, "***** Bootstrap airlineStats offline and 
realtime table *****");
-    runner.bootstrapTable();
-
-    printStatus(Color.YELLOW, "***** Pinot Hybrid with hybrid table setup is 
complete *****");
-    printStatus(Color.YELLOW, "***** Sequence of operations *****");
-    printStatus(Color.YELLOW, "*****    1. Started 1 controller instance where 
tenant creation is enabled *****");
-    printStatus(Color.YELLOW, "*****    2. Started 2 servers and 2 brokers 
*****");
-    printStatus(Color.YELLOW, "*****    3. Created a server tenant with 1 
offline and 1 realtime instance *****");
-    printStatus(Color.YELLOW, "*****    4. Created a broker tenant with 2 
instances *****");
-    printStatus(Color.YELLOW, "*****    5. Added a schema *****");
-    printStatus(Color.YELLOW,
-        "*****    6. Created an offline and a realtime table with the tenant 
names created above *****");
-    printStatus(Color.YELLOW, "*****    7. Built and pushed an offline segment 
*****");
-    printStatus(Color.YELLOW,
-        "*****    8. Started publishing a Kafka stream for the realtime 
instance to start consuming *****");
-    printStatus(Color.YELLOW, "*****    9. Sleep 5 Seconds to wait for all 
components brought up *****");
-    Thread.sleep(5000);
-
     String q1 = "select count(*) from airlineStats limit 1";
     printStatus(Color.YELLOW, "Total number of documents in the table");
     printStatus(Color.CYAN, "Query : " + q1);
@@ -198,6 +92,53 @@ public class HybridQuickstart extends QuickStartBase {
     printStatus(Color.CYAN, "Query : " + q5);
     printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q5)));
     printStatus(Color.GREEN, 
"***************************************************");
+  }
+
+  @Override
+  protected String[] getDefaultBatchTableDirectories() {
+    return new String[]{"examples/batch/airlineStats"};
+  }
+
+  @Override
+  protected Map<String, String> getDefaultStreamTableDirectories() {
+    return ImmutableMap.of("airlineStats", "examples/stream/airlineStats");
+  }
+
+  public void execute()
+      throws Exception {
+    File quickstartTmpDir = new File(_dataDir, 
String.valueOf(System.currentTimeMillis()));
+    File quickstartRunnerDir = new File(quickstartTmpDir, "quickstart");
+    Preconditions.checkState(quickstartRunnerDir.mkdirs());
+    Set<QuickstartTableRequest> quickstartTableRequests = new HashSet<>();
+    
quickstartTableRequests.addAll(bootstrapOfflineTableDirectories(quickstartTmpDir));
+    
quickstartTableRequests.addAll(bootstrapStreamTableDirectories(quickstartTmpDir));
+    final QuickstartRunner runner =
+        new QuickstartRunner(new ArrayList<>(quickstartTableRequests), 1, 1, 
1, 1, quickstartRunnerDir,
+            getConfigOverrides());
+
+    startKafka();
+    startAllDataStreams(_kafkaStarter, quickstartTmpDir);
+
+    printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker, 
server and minion *****");
+    runner.startAll();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
+        runner.stop();
+        FileUtils.deleteDirectory(quickstartTmpDir);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }));
+
+    printStatus(Color.CYAN, "***** Bootstrap all tables *****");
+    runner.bootstrapTable();
+
+    printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to 
get populated *****");
+    Thread.sleep(5000);
+
+    printStatus(Color.YELLOW, "***** Realtime quickstart setup complete 
*****");
+    runSampleQueries(runner);
 
     printStatus(Color.GREEN, "You can always go to http://localhost:9000 to 
play around in the query console");
   }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
index 19fd63e72b..a581d5c89a 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
@@ -25,20 +25,14 @@ import java.net.URL;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.spi.plugin.PluginManager;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import org.apache.pinot.tools.streams.MeetupRsvpStream;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
-
 
 public class PartialUpsertQuickStart extends QuickStartBase {
-  private StreamDataServerStartable _kafkaStarter;
 
   public static void main(String[] args)
       throws Exception {
@@ -75,15 +69,7 @@ public class PartialUpsertQuickStart extends QuickStartBase {
     final QuickstartRunner runner =
         new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, 
getConfigOverrides());
 
-    printStatus(Color.CYAN, "***** Starting Kafka *****");
-    final ZkStarter.ZookeeperInstance zookeeperInstance = 
ZkStarter.startLocalZkServer();
-    try {
-      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
-          KafkaStarterUtils.getDefaultKafkaConfiguration());
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
-    }
-    _kafkaStarter.start();
+    startKafka();
     _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
     printStatus(Color.CYAN, "***** Starting  meetup data stream and publishing 
to Kafka *****");
     MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true);
@@ -95,8 +81,6 @@ public class PartialUpsertQuickStart extends QuickStartBase {
         printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
         runner.stop();
         meetupRSVPProvider.stopPublishing();
-        _kafkaStarter.stop();
-        ZkStarter.stopLocalZkServer(zookeeperInstance);
         FileUtils.deleteDirectory(quickstartTmpDir);
       } catch (Exception e) {
         e.printStackTrace();
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
index 225aeb40fd..bab8c768cb 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
@@ -24,15 +24,25 @@ import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.LineIterator;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tools.streams.AirlineDataStream;
+import org.apache.pinot.tools.streams.MeetupRsvpStream;
 import org.apache.pinot.tools.utils.JarUtils;
+import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.apache.pinot.tools.utils.PinotConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,10 +73,19 @@ public abstract class QuickStartBase {
       "examples/batch/githubComplexTypeEvents"
   };
 
+  protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES 
= ImmutableMap.of(
+      "airlineStats", "examples/stream/airlineStats",
+      "githubEvents", "examples/minions/stream/githubEvents",
+      "meetupRsvp", "examples/stream/meetupRsvp",
+      "meetupRsvpJson", "examples/stream/meetupRsvpJson",
+      "meetupRsvpComplexType", "examples/stream/meetupRsvpComplexType");
+
   protected File _dataDir = FileUtils.getTempDirectory();
   protected String[] _bootstrapDataDirs;
   protected String _zkExternalAddress;
   protected String _configFilePath;
+  protected StreamDataServerStartable _kafkaStarter;
+  protected ZkStarter.ZookeeperInstance _zookeeperInstance;
 
   public QuickStartBase setDataDir(String dataDir) {
     _dataDir = new File(dataDir);
@@ -185,6 +204,25 @@ public abstract class QuickStartBase {
     }
   }
 
+  protected List<QuickstartTableRequest> bootstrapStreamTableDirectories(File 
quickstartTmpDir)
+      throws IOException {
+    List<QuickstartTableRequest> quickstartTableRequests = new ArrayList<>();
+    for (Map.Entry<String, String> entry : 
getDefaultStreamTableDirectories().entrySet()) {
+      String tableName = entry.getKey();
+      String directory = entry.getValue();
+      File baseDir = new File(quickstartTmpDir, tableName);
+      File dataDir = new File(baseDir, "rawdata");
+      dataDir.mkdirs();
+      if (useDefaultBootstrapTableDir()) {
+        copyResourceTableToTmpDirectory(directory, tableName, baseDir, 
dataDir, true);
+      } else {
+        copyFilesystemTableToTmpDirectory(directory, tableName, baseDir);
+      }
+      quickstartTableRequests.add(new 
QuickstartTableRequest(baseDir.getAbsolutePath()));
+    }
+    return quickstartTableRequests;
+  }
+
   private static void copyFilesystemTableToTmpDirectory(String sourcePath, 
String tableName, File baseDir)
       throws IOException {
     File fileDb = new File(sourcePath);
@@ -246,4 +284,130 @@ public abstract class QuickStartBase {
     }
     return responseBuilder.toString();
   }
+
+  protected Map<String, String> getDefaultStreamTableDirectories() {
+    return DEFAULT_STREAM_TABLE_DIRECTORIES;
+  }
+
+  protected static void publishStreamDataToKafka(String tableName, File 
dataDir)
+      throws Exception {
+    switch (tableName) {
+      case "githubEvents":
+        publishGithubEventsToKafka("githubEvents", new File(dataDir, 
"/rawdata/2021-07-21-few-hours.json"));
+        break;
+      default:
+        break;
+    }
+  }
+
+  protected static void publishGithubEventsToKafka(String topicName, File 
dataFile)
+      throws Exception {
+    Properties properties = new Properties();
+    properties.put("metadata.broker.list", 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+    properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
+    properties.put("request.required.acks", "1");
+    StreamDataProducer producer =
+        
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
+    try {
+      LineIterator dataStream = FileUtils.lineIterator(dataFile);
+
+      while (dataStream.hasNext()) {
+        producer.produce(topicName, 
dataStream.nextLine().getBytes(StandardCharsets.UTF_8));
+      }
+    } finally {
+      producer.close();
+    }
+  }
+
+  protected void startKafka() {
+    printStatus(Quickstart.Color.CYAN, "***** Starting Kafka *****");
+    _zookeeperInstance = ZkStarter.startLocalZkServer();
+    try {
+      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
+          KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance));
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
+    }
+    _kafkaStarter.start();
+
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        printStatus(Quickstart.Color.GREEN, "***** Shutting down kafka and 
zookeeper *****");
+        _kafkaStarter.stop();
+        ZkStarter.stopLocalZkServer(_zookeeperInstance);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }));
+
+    printStatus(Quickstart.Color.CYAN, "***** Kafka Started *****");
+  }
+
+  public void startAllDataStreams(StreamDataServerStartable kafkaStarter, File 
quickstartTmpDir)
+      throws Exception {
+    for (String streamName : getDefaultStreamTableDirectories().keySet()) {
+      switch (streamName) {
+        case "airlineStats":
+          kafkaStarter.createTopic("flights-realtime", 
KafkaStarterUtils.getTopicCreationProps(10));
+          printStatus(Quickstart.Color.CYAN, "***** Starting airlineStats data 
stream and publishing to Kafka *****");
+          AirlineDataStream airlineDataStream = new AirlineDataStream(new 
File(quickstartTmpDir, "airlineStats"));
+          airlineDataStream.run();
+          Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+              airlineDataStream.shutdown();
+            } catch (Exception e) {
+              e.printStackTrace();
+            }
+          }));
+          break;
+        case "meetupRsvp":
+          kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(10));
+          printStatus(Quickstart.Color.CYAN, "***** Starting meetup data 
stream and publishing to Kafka *****");
+          MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
+          meetupRSVPProvider.run();
+          Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+              meetupRSVPProvider.stopPublishing();
+            } catch (Exception e) {
+              e.printStackTrace();
+            }
+          }));
+          break;
+        case "meetupRsvpJson":
+          kafkaStarter.createTopic("meetupRSVPJsonEvents", 
KafkaStarterUtils.getTopicCreationProps(10));
+          printStatus(Quickstart.Color.CYAN, "***** Starting meetupRsvpJson 
data stream and publishing to Kafka *****");
+          MeetupRsvpStream meetupRSVPJsonProvider = new 
MeetupRsvpStream("meetupRSVPJsonEvents");
+          meetupRSVPJsonProvider.run();
+          Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+              meetupRSVPJsonProvider.stopPublishing();
+            } catch (Exception e) {
+              e.printStackTrace();
+            }
+          }));
+          break;
+        case "meetupRsvpComplexType":
+          kafkaStarter.createTopic("meetupRSVPComplexTypeEvents", 
KafkaStarterUtils.getTopicCreationProps(10));
+          printStatus(Quickstart.Color.CYAN,
+              "***** Starting meetupRSVPComplexType data stream and publishing 
to Kafka *****");
+          MeetupRsvpStream meetupRSVPComplexTypeProvider = new 
MeetupRsvpStream("meetupRSVPComplexTypeEvents");
+          meetupRSVPComplexTypeProvider.run();
+          Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+              meetupRSVPComplexTypeProvider.stopPublishing();
+            } catch (Exception e) {
+              e.printStackTrace();
+            }
+          }));
+          break;
+        case "githubEvents":
+          kafkaStarter.createTopic("githubEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
+          printStatus(Quickstart.Color.CYAN, "***** Starting githubEvents data 
stream and publishing to Kafka *****");
+          publishStreamDataToKafka("githubEvents", new File(quickstartTmpDir, 
"githubEvents"));
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown stream name: " + 
streamName);
+      }
+    }
+  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
index f635dc098b..f0b66ef1df 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.tools;
 
 import java.io.File;
+import java.util.Objects;
 import org.apache.pinot.spi.config.table.TableType;
 
 
@@ -103,4 +104,24 @@ public class QuickstartTableRequest {
         + ", tableRequestFile = " + _tableRequestFile + ", ingestionJobFile = 
" + _ingestionJobFile
         + ", bootstrapTableDir = " + _bootstrapTableDir + " }";
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    QuickstartTableRequest that = (QuickstartTableRequest) o;
+    return Objects.equals(_tableName, that._tableName) && _tableType == 
that._tableType
+        && Objects.equals(_schemaFile, that._schemaFile) && 
Objects.equals(_tableRequestFile,
+        that._tableRequestFile) && Objects.equals(_ingestionJobFile, 
that._ingestionJobFile)
+        && Objects.equals(_bootstrapTableDir, that._bootstrapTableDir);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(_tableName, _tableType, _schemaFile, 
_tableRequestFile, _ingestionJobFile, _bootstrapTableDir);
+  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
index 2d9e6f7d09..0027008c01 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
@@ -18,33 +18,20 @@
  */
 package org.apache.pinot.tools;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.PinotAdministrator;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
 
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
 
-
-public class RealtimeComplexTypeHandlingQuickStart extends QuickStartBase {
+public class RealtimeComplexTypeHandlingQuickStart extends RealtimeQuickStart {
   @Override
   public List<String> types() {
-      return Arrays.asList("REALTIME_COMPLEX_TYPE", "REALTIME-COMPLEX-TYPE",
-              "STREAM_COMPLEX_TYPE", "STREAM-COMPLEX-TYPE");
+    return Arrays.asList("REALTIME_COMPLEX_TYPE", "REALTIME-COMPLEX-TYPE",
+        "STREAM_COMPLEX_TYPE", "STREAM-COMPLEX-TYPE");
   }
-  private StreamDataServerStartable _kafkaStarter;
 
   public static void main(String[] args)
       throws Exception {
@@ -54,71 +41,15 @@ public class RealtimeComplexTypeHandlingQuickStart extends 
QuickStartBase {
     PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
   }
 
-  public void execute()
+  @Override
+  public void runSampleQueries(QuickstartRunner runner)
       throws Exception {
-    File quickstartTmpDir = new File(_dataDir, 
String.valueOf(System.currentTimeMillis()));
-    File baseDir = new File(quickstartTmpDir, "meetupRsvp");
-    File dataDir = new File(baseDir, "data");
-    Preconditions.checkState(dataDir.mkdirs());
-
-    File schemaFile = new File(baseDir, "meetupRsvp_schema.json");
-    File tableConfigFile = new File(baseDir, 
"meetupRsvp_realtime_table_config.json");
-
-    ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = 
classLoader.getResource("examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, schemaFile);
-    resource =
-        
classLoader.getResource("examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, tableConfigFile);
-
-    QuickstartTableRequest request = new 
QuickstartTableRequest(baseDir.getAbsolutePath());
-    QuickstartRunner runner =
-        new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, 
getConfigOverrides());
-
-    printStatus(Color.CYAN, "***** Starting Kafka *****");
-    ZkStarter.ZookeeperInstance zookeeperInstance = 
ZkStarter.startLocalZkServer();
-    try {
-      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
-          KafkaStarterUtils.getDefaultKafkaConfiguration());
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
-    }
-    _kafkaStarter.start();
-    _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
-    printStatus(Color.CYAN, "***** Starting meetup data stream and publishing 
to Kafka *****");
-    MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
-    meetupRSVPProvider.run();
-    printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and 
broker *****");
-    runner.startAll();
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-      try {
-        printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
-        runner.stop();
-        meetupRSVPProvider.stopPublishing();
-        _kafkaStarter.stop();
-        ZkStarter.stopLocalZkServer(zookeeperInstance);
-        FileUtils.deleteDirectory(quickstartTmpDir);
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }));
-    printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****");
-    runner.bootstrapTable();
-    printStatus(Color.CYAN, "***** Waiting for 20 seconds for a few events to 
get populated *****");
-    Thread.sleep(20000);
-
-    printStatus(Color.YELLOW, "***** Realtime json-index quickstart setup 
complete *****");
-
     String q1 =
-        "select \"group.group_topics.urlkey\", 
\"group.group_topics.topic_name\", \"group.group_id\" from meetupRsvp "
-            + "limit 10";
+        "select \"group.group_topics.urlkey\", 
\"group.group_topics.topic_name\", \"group.group_id\" from "
+            + "meetupRsvpComplexType limit 10";
     printStatus(Color.YELLOW, "Events related to fitness");
     printStatus(Color.CYAN, "Query : " + q1);
     printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
     printStatus(Color.GREEN, 
"***************************************************");
-
-    printStatus(Color.GREEN, "You can always go to http://localhost:9000 to 
play around in the query console");
   }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
index e77d611fb0..75d643497e 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
@@ -18,33 +18,31 @@
  */
 package org.apache.pinot.tools;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.PinotAdministrator;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
 
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
 
-
-public class RealtimeJsonIndexQuickStart extends QuickStartBase {
+public class RealtimeJsonIndexQuickStart extends RealtimeQuickStart {
   @Override
   public List<String> types() {
     return Arrays.asList("REALTIME_JSON_INDEX", "REALTIME-JSON-INDEX", 
"STREAM_JSON_INDEX", "STREAM-JSON-INDEX");
   }
 
-  private StreamDataServerStartable _kafkaStarter;
+  @Override
+  public void runSampleQueries(QuickstartRunner runner)
+      throws Exception {
+    String q1 = "select json_extract_scalar(event_json, '$.event_name', 
'STRING') from meetupRsvpJson where json_match"
+        + "(group_json, '\"$.group_topics[*].topic_name\"=''topic_name0''') 
limit 10";
+    printStatus(Color.YELLOW, "Events related to fitness");
+    printStatus(Color.CYAN, "Query : " + q1);
+    printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
+
+    printStatus(Color.GREEN, 
"***************************************************");
+  }
 
   public static void main(String[] args)
       throws Exception {
@@ -53,70 +51,4 @@ public class RealtimeJsonIndexQuickStart extends 
QuickStartBase {
     arguments.addAll(Arrays.asList(args));
     PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
   }
-
-  public void execute()
-      throws Exception {
-    File quickstartTmpDir = new File(_dataDir, 
String.valueOf(System.currentTimeMillis()));
-    File baseDir = new File(quickstartTmpDir, "meetupRsvp");
-    File dataDir = new File(baseDir, "data");
-    Preconditions.checkState(dataDir.mkdirs());
-
-    File schemaFile = new File(baseDir, "meetupRsvp_schema.json");
-    File tableConfigFile = new File(baseDir, 
"meetupRsvp_realtime_table_config.json");
-
-    ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = 
classLoader.getResource("examples/stream/meetupRsvp/json_meetupRsvp_schema.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, schemaFile);
-    resource = 
classLoader.getResource("examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, tableConfigFile);
-
-    QuickstartTableRequest request = new 
QuickstartTableRequest(baseDir.getAbsolutePath());
-    QuickstartRunner runner =
-        new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, 
getConfigOverrides());
-
-    printStatus(Color.CYAN, "***** Starting Kafka *****");
-    ZkStarter.ZookeeperInstance zookeeperInstance = 
ZkStarter.startLocalZkServer();
-    try {
-      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
-          KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance));
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
-    }
-    _kafkaStarter.start();
-    _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
-    printStatus(Color.CYAN, "***** Starting meetup data stream and publishing 
to Kafka *****");
-    MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
-    meetupRSVPProvider.run();
-    printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and 
broker *****");
-    runner.startAll();
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-      try {
-        printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
-        runner.stop();
-        meetupRSVPProvider.stopPublishing();
-        _kafkaStarter.stop();
-        ZkStarter.stopLocalZkServer(zookeeperInstance);
-        FileUtils.deleteDirectory(quickstartTmpDir);
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }));
-    printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****");
-    runner.bootstrapTable();
-    printStatus(Color.CYAN, "***** Waiting for 20 seconds for a few events to 
get populated *****");
-    Thread.sleep(20000);
-
-    printStatus(Color.YELLOW, "***** Realtime json-index quickstart setup 
complete *****");
-
-    String q1 = "select json_extract_scalar(event_json, '$.event_name', 
'STRING') from meetupRsvp where json_match"
-        + "(group_json, '\"$.group_topics[*].topic_name\"=''Fitness''') limit 
10";
-    printStatus(Color.YELLOW, "Events related to fitness");
-    printStatus(Color.CYAN, "Query : " + q1);
-    printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
-
-    printStatus(Color.GREEN, 
"***************************************************");
-    printStatus(Color.GREEN, "You can always go to http://localhost:9000 to 
play around in the query console");
-  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 626961c1ee..be8db25933 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -19,23 +19,14 @@
 package org.apache.pinot.tools;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import java.io.File;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.PinotAdministrator;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
 
 
 public class RealtimeQuickStart extends QuickStartBase {
@@ -44,8 +35,6 @@ public class RealtimeQuickStart extends QuickStartBase {
     return Arrays.asList("REALTIME", "STREAM");
   }
 
-  private StreamDataServerStartable _kafkaStarter;
-
   public static void main(String[] args)
       throws Exception {
     List<String> arguments = new ArrayList<>();
@@ -54,62 +43,9 @@ public class RealtimeQuickStart extends QuickStartBase {
     PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
   }
 
-  public void execute()
+  @Override
+  public void runSampleQueries(QuickstartRunner runner)
       throws Exception {
-    File quickstartTmpDir = new File(_dataDir, 
String.valueOf(System.currentTimeMillis()));
-    File baseDir = new File(quickstartTmpDir, "meetupRsvp");
-    File dataDir = new File(baseDir, "rawdata");
-    Preconditions.checkState(dataDir.mkdirs());
-
-    File schemaFile = new File(baseDir, "meetupRsvp_schema.json");
-    File tableConfigFile = new File(baseDir, 
"meetupRsvp_realtime_table_config.json");
-
-    ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = 
classLoader.getResource("examples/stream/meetupRsvp/meetupRsvp_schema.json");
-    com.google.common.base.Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, schemaFile);
-    resource = 
classLoader.getResource("examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json");
-    com.google.common.base.Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, tableConfigFile);
-
-    QuickstartTableRequest request = new 
QuickstartTableRequest(baseDir.getAbsolutePath());
-    final QuickstartRunner runner =
-        new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, 
getConfigOverrides());
-
-    printStatus(Color.CYAN, "***** Starting Kafka *****");
-    final ZkStarter.ZookeeperInstance zookeeperInstance = 
ZkStarter.startLocalZkServer();
-    try {
-      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
-          KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance));
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
-    }
-    _kafkaStarter.start();
-    _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(10));
-    printStatus(Color.CYAN, "***** Starting meetup data stream and publishing 
to Kafka *****");
-    MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
-    meetupRSVPProvider.run();
-    printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and 
broker *****");
-    runner.startAll();
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-      try {
-        printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
-        runner.stop();
-        meetupRSVPProvider.stopPublishing();
-        _kafkaStarter.stop();
-        ZkStarter.stopLocalZkServer(zookeeperInstance);
-        FileUtils.deleteDirectory(quickstartTmpDir);
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }));
-    printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****");
-    runner.bootstrapTable();
-    printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to 
get populated *****");
-    Thread.sleep(5000);
-
-    printStatus(Color.YELLOW, "***** Realtime quickstart setup complete 
*****");
-
     String q1 = "select count(*) from meetupRsvp limit 1";
     printStatus(Color.YELLOW, "Total number of documents in the table");
     printStatus(Color.CYAN, "Query : " + q1);
@@ -141,6 +77,40 @@ public class RealtimeQuickStart extends QuickStartBase {
     printStatus(Color.CYAN, "Query : " + q5);
     printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q5)));
     printStatus(Color.GREEN, 
"***************************************************");
+  }
+
+  public void execute()
+      throws Exception {
+    File quickstartTmpDir = new File(_dataDir, 
String.valueOf(System.currentTimeMillis()));
+    File quickstartRunnerDir = new File(quickstartTmpDir, "quickstart");
+    Preconditions.checkState(quickstartRunnerDir.mkdirs());
+    List<QuickstartTableRequest> quickstartTableRequests = 
bootstrapStreamTableDirectories(quickstartTmpDir);
+    final QuickstartRunner runner =
+        new QuickstartRunner(quickstartTableRequests, 1, 1, 1, 1, 
quickstartRunnerDir, getConfigOverrides());
+
+    startKafka();
+    startAllDataStreams(_kafkaStarter, quickstartTmpDir);
+
+    printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker, 
server and minion *****");
+    runner.startAll();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
+        runner.stop();
+        FileUtils.deleteDirectory(quickstartTmpDir);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }));
+
+    printStatus(Color.CYAN, "***** Bootstrap all tables *****");
+    runner.bootstrapTable();
+
+    printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to 
get populated *****");
+    Thread.sleep(5000);
+
+    printStatus(Color.YELLOW, "***** Realtime quickstart setup complete 
*****");
+    runSampleQueries(runner);
 
     printStatus(Color.GREEN, "You can always go to http://localhost:9000 to 
play around in the query console");
   }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java
index 60786d82a3..b51baf6179 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java
@@ -18,122 +18,29 @@
  */
 package org.apache.pinot.tools;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.LineIterator;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProducer;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.PinotAdministrator;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
 
 
 /**
  * This quickstart shows how RealtimeToOfflineSegmentsTask and MergeRollupTask 
minion
  * tasks continuously optimize segments as data gets ingested into Realtime 
table.
  */
-public class RealtimeQuickStartWithMinion extends QuickStartBase {
+public class RealtimeQuickStartWithMinion extends RealtimeQuickStart {
   @Override
   public List<String> types() {
     return Arrays.asList("REALTIME_MINION", "REALTIME-MINION");
   }
 
-  private StreamDataServerStartable _kafkaStarter;
-
-  public static void main(String[] args)
-      throws Exception {
-    List<String> arguments = new ArrayList<>();
-    arguments.addAll(Arrays.asList("QuickStart", "-type", "REALTIME-MINION"));
-    arguments.addAll(Arrays.asList(args));
-    PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
-  }
-
-  public Map<String, Object> getConfigOverrides() {
-    Map<String, Object> properties = new HashMap<>(super.getConfigOverrides());
-    properties.putIfAbsent("controller.task.scheduler.enabled", true);
-    return properties;
-  }
-
-  public void execute()
+  @Override
+  public void runSampleQueries(QuickstartRunner runner)
       throws Exception {
-    File quickstartTmpDir = new File(_dataDir, 
String.valueOf(System.currentTimeMillis()));
-    File baseDir = new File(quickstartTmpDir, "githubEvents");
-    File dataDir = new File(baseDir, "rawdata");
-    Preconditions.checkState(dataDir.mkdirs());
-
-    File schemaFile = new File(baseDir, "githubEvents_schema.json");
-    File realtimeTableConfigFile = new File(baseDir, 
"githubEvents_realtime_table_config.json");
-    File offlineTableConfigFile = new File(baseDir, 
"githubEvents_offline_table_config.json");
-    File inputDataFile = new File(baseDir, "2021-07-21-few-hours.json");
-
-    ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = 
classLoader.getResource("examples/minions/stream/githubEvents/githubEvents_schema.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, schemaFile);
-    resource = 
classLoader.getResource("examples/minions/stream/githubEvents/githubEvents_realtime_table_config.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, realtimeTableConfigFile);
-    resource = 
classLoader.getResource("examples/minions/stream/githubEvents/githubEvents_offline_table_config.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, offlineTableConfigFile);
-    resource = Quickstart.class.getClassLoader()
-        
.getResource("examples/minions/stream/githubEvents/rawdata/2021-07-21-few-hours.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, inputDataFile);
-
-    QuickstartTableRequest request = new 
QuickstartTableRequest(baseDir.getAbsolutePath());
-    QuickstartRunner runner =
-        new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1,
-            dataDir, true, null, getConfigOverrides(), null, true);
-
-    printStatus(Color.CYAN, "***** Starting Kafka *****");
-    final ZkStarter.ZookeeperInstance zookeeperInstance = 
ZkStarter.startLocalZkServer();
-    try {
-      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
-          KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance));
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
-    }
-    _kafkaStarter.start();
-
-    printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and 
broker *****");
-    runner.startAll();
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-      try {
-        printStatus(Color.GREEN, "***** Shutting down realtime-minion quick 
start *****");
-        runner.stop();
-        _kafkaStarter.stop();
-        ZkStarter.stopLocalZkServer(zookeeperInstance);
-        FileUtils.deleteDirectory(quickstartTmpDir);
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }));
-
-    printStatus(Color.CYAN, "***** Sending events to Kafka *****");
-    _kafkaStarter.createTopic("githubEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
-    publishGithubEventsToKafka("githubEvents", inputDataFile);
-
-    printStatus(Color.CYAN, "***** Bootstrap githubEvents tables *****");
-    runner.bootstrapTable();
-    printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to 
get populated *****");
-    Thread.sleep(5000);
-
     printStatus(Color.YELLOW, "***** Realtime-minion quickstart setup complete 
*****");
 
     String q1 = "select count(*) from githubEvents limit 1";
@@ -142,26 +49,21 @@ public class RealtimeQuickStartWithMinion extends 
QuickStartBase {
     printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
     printStatus(Color.GREEN, 
"***************************************************");
 
-    printStatus(Color.GREEN, "You can always go to http://localhost:9000 to 
play around in the query console");
     printStatus(Color.GREEN, "In particular, you will find that OFFLINE table 
gets segments from REALTIME table;");
     printStatus(Color.GREEN, "and segments in OFFLINE table get merged into 
larger ones within a few minutes.");
   }
 
-  private static void publishGithubEventsToKafka(String topicName, File 
dataFile)
+  public static void main(String[] args)
       throws Exception {
-    Properties properties = new Properties();
-    properties.put("metadata.broker.list", 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
-    properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
-    properties.put("request.required.acks", "1");
-    StreamDataProducer producer =
-        
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
-    try {
-      LineIterator dataStream = FileUtils.lineIterator(dataFile);
-      while (dataStream.hasNext()) {
-        producer.produce(topicName, 
dataStream.nextLine().getBytes(StandardCharsets.UTF_8));
-      }
-    } finally {
-      producer.close();
-    }
+    List<String> arguments = new ArrayList<>();
+    arguments.addAll(Arrays.asList("QuickStart", "-type", "REALTIME-MINION"));
+    arguments.addAll(Arrays.asList(args));
+    PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
+  }
+
+  public Map<String, Object> getConfigOverrides() {
+    Map<String, Object> properties = new HashMap<>(super.getConfigOverrides());
+    properties.putIfAbsent("controller.task.scheduler.enabled", true);
+    return properties;
   }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
index 383d2593ec..22af89ff27 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
@@ -26,9 +26,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.PinotAdministrator;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
@@ -36,8 +33,6 @@ import org.apache.pinot.tools.streams.MeetupRsvpStream;
 import org.apache.pinot.tools.streams.RsvpSourceGenerator;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
-
 
 public class UpsertJsonQuickStart extends QuickStartBase {
   @Override
@@ -45,8 +40,6 @@ public class UpsertJsonQuickStart extends QuickStartBase {
     return Arrays.asList("UPSERT_JSON_INDEX", "UPSERT-JSON-INDEX");
   }
 
-  private StreamDataServerStartable _kafkaStarter;
-
   public static void main(String[] args)
       throws Exception {
     List<String> arguments = new ArrayList<>();
@@ -77,15 +70,7 @@ public class UpsertJsonQuickStart extends QuickStartBase {
     QuickstartRunner runner =
         new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, 
getConfigOverrides());
 
-    printStatus(Color.CYAN, "***** Starting Kafka *****");
-    ZkStarter.ZookeeperInstance zookeeperInstance = 
ZkStarter.startLocalZkServer();
-    try {
-      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
-          KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance));
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
-    }
-    _kafkaStarter.start();
+    startKafka();
     _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
     printStatus(Color.CYAN, "***** Starting meetup data stream and publishing 
to Kafka *****");
     MeetupRsvpStream meetupRSVPProvider = new 
MeetupRsvpStream(RsvpSourceGenerator.KeyColumn.RSVP_ID);
@@ -97,8 +82,6 @@ public class UpsertJsonQuickStart extends QuickStartBase {
         printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
         runner.stop();
         meetupRSVPProvider.stopPublishing();
-        _kafkaStarter.stop();
-        ZkStarter.stopLocalZkServer(zookeeperInstance);
         FileUtils.deleteDirectory(quickstartTmpDir);
       } catch (Exception e) {
         e.printStackTrace();
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
index ccf219325d..d67bbcdaf6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
@@ -27,17 +27,12 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.PinotAdministrator;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import org.apache.pinot.tools.streams.MeetupRsvpStream;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
-
 
 public class UpsertQuickStart extends QuickStartBase {
   @Override
@@ -45,8 +40,6 @@ public class UpsertQuickStart extends QuickStartBase {
     return Collections.singletonList("UPSERT");
   }
 
-  private StreamDataServerStartable _kafkaStarter;
-
   public static void main(String[] args)
       throws Exception {
     List<String> arguments = new ArrayList<>();
@@ -77,15 +70,7 @@ public class UpsertQuickStart extends QuickStartBase {
     final QuickstartRunner runner
         = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, 
dataDir, getConfigOverrides());
 
-    printStatus(Color.CYAN, "***** Starting Kafka *****");
-    final ZkStarter.ZookeeperInstance zookeeperInstance = 
ZkStarter.startLocalZkServer();
-    try {
-      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
-          KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance));
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
-    }
-    _kafkaStarter.start();
+    startKafka();
     _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
     printStatus(Color.CYAN, "***** Starting  meetup data stream and publishing 
to Kafka *****");
     MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true);
@@ -97,8 +82,6 @@ public class UpsertQuickStart extends QuickStartBase {
         printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
         runner.stop();
         meetupRSVPProvider.stopPublishing();
-        _kafkaStarter.stop();
-        ZkStarter.stopLocalZkServer(zookeeperInstance);
         FileUtils.deleteDirectory(quickstartTmpDir);
       } catch (Exception e) {
         e.printStackTrace();
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
index 2dee145ac0..9b4aea8060 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
@@ -25,6 +25,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.QuickStartBase;
 import org.apache.pinot.tools.Quickstart;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
@@ -42,11 +43,6 @@ public class AirlineDataStream {
   private StreamDataProducer _producer;
   private PinotRealtimeSource _pinotStream;
 
-  public AirlineDataStream(Schema pinotSchema, TableConfig tableConfig, File 
avroFile)
-      throws Exception {
-    this(pinotSchema, tableConfig, avroFile, getDefaultKafkaProducer());
-  }
-
   public AirlineDataStream(Schema pinotSchema, TableConfig tableConfig, File 
avroFile, StreamDataProducer producer)
       throws IOException {
     _pinotSchema = pinotSchema;
@@ -63,6 +59,13 @@ public class AirlineDataStream {
             + "every 60 events (which is approximately 60 seconds) *****");
   }
 
+  public AirlineDataStream(File baseDir)
+      throws Exception {
+    this(Schema.fromFile(new File(baseDir, "airlineStats_schema.json")),
+        JsonUtils.fileToObject(new File(baseDir, 
"airlineStats_realtime_table_config.json"), TableConfig.class),
+        new File(baseDir, "rawdata/airlineStats_data.avro"), 
getDefaultKafkaProducer());
+  }
+
   public static StreamDataProducer getDefaultKafkaProducer()
       throws Exception {
     Properties properties = new Properties();
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index f42ffd7fe9..527ca283fb 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -30,23 +30,35 @@ import org.slf4j.LoggerFactory;
 public class MeetupRsvpStream {
   protected static final Logger LOGGER = 
LoggerFactory.getLogger(MeetupRsvpStream.class);
   private static final String DEFAULT_TOPIC_NAME = "meetupRSVPEvents";
-  protected String _topicName = DEFAULT_TOPIC_NAME;
+  protected String _topicName;
 
   protected PinotRealtimeSource _pinotRealtimeSource;
 
   public MeetupRsvpStream()
       throws Exception {
-    this(false);
+    this(DEFAULT_TOPIC_NAME, RsvpSourceGenerator.KeyColumn.NONE);
   }
 
   public MeetupRsvpStream(boolean partitionByKey)
       throws Exception {
     // calling this constructor means that we wish to use EVENT_ID as key. 
RsvpId is used by MeetupRsvpJsonStream
-    this(partitionByKey ? RsvpSourceGenerator.KeyColumn.EVENT_ID : 
RsvpSourceGenerator.KeyColumn.NONE);
+    this(DEFAULT_TOPIC_NAME,
+        partitionByKey ? RsvpSourceGenerator.KeyColumn.EVENT_ID : 
RsvpSourceGenerator.KeyColumn.NONE);
+  }
+
+  public MeetupRsvpStream(String topicName)
+      throws Exception {
+    this(topicName, RsvpSourceGenerator.KeyColumn.NONE);
   }
 
   public MeetupRsvpStream(RsvpSourceGenerator.KeyColumn keyColumn)
       throws Exception {
+    this(DEFAULT_TOPIC_NAME, keyColumn);
+  }
+
+  public MeetupRsvpStream(String topicName, RsvpSourceGenerator.KeyColumn 
keyColumn)
+      throws Exception {
+    _topicName = topicName;
     Properties properties = new Properties();
     properties.put("metadata.broker.list", 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
index d192d29d1b..cf04a9acbe 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
@@ -19,6 +19,7 @@
 
 package org.apache.pinot.tools.streams;
 
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableList;
 import java.time.LocalDateTime;
@@ -50,17 +51,62 @@ public class RsvpSourceGenerator implements 
PinotSourceDataGenerator {
   public RSVP createMessage() {
     String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + "";
     ObjectNode json = JsonUtils.newObjectNode();
-    json.put("venue_name", "venue_name" + 
ThreadLocalRandom.current().nextInt());
-    json.put("event_name", "event_name" + 
ThreadLocalRandom.current().nextInt());
+    ObjectNode eventJson = JsonUtils.newObjectNode();
+    json.set("event", eventJson);
+    ObjectNode groupJson = JsonUtils.newObjectNode();
+    json.set("group", groupJson);
+    ObjectNode venueJson = JsonUtils.newObjectNode();
+    json.set("venue", venueJson);
+
+    String venueName = "venue_name" + ThreadLocalRandom.current().nextInt();
+    venueJson.put("venue_name", venueName);
+    json.put("venue_name", venueName);
+
+    String eventName = "event_name" + ThreadLocalRandom.current().nextInt();
+    eventJson.put("event_name", eventName);
+    json.put("event_name", eventName);
+
     json.put("event_id", eventId);
     json.put("event_time", 
DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10)));
-    json.put("group_city", "group_city" + 
ThreadLocalRandom.current().nextInt(1000));
-    json.put("group_country", "group_country" + 
ThreadLocalRandom.current().nextInt(100));
-    json.put("group_id", Math.abs(ThreadLocalRandom.current().nextLong()));
-    json.put("group_name", "group_name" + 
ThreadLocalRandom.current().nextInt());
-    json.put("group_lat", ThreadLocalRandom.current().nextDouble(-90.0, 90.0));
-    json.put("group_lon", ThreadLocalRandom.current().nextDouble(180.0));
+
+    ArrayNode groupTopicsJson = JsonUtils.newArrayNode();
+    groupJson.set("group_topics", groupTopicsJson);
+    for (int i = 0; i < ThreadLocalRandom.current().nextInt(5) + 1; i++) {
+      ObjectNode groupTopicJson = JsonUtils.newObjectNode();
+      groupTopicJson.put("topic_name", "topic_name" + 
ThreadLocalRandom.current().nextInt(10));
+      groupTopicJson.put("urlkey", "http://group-url-"; + 
ThreadLocalRandom.current().nextInt(1000));
+      groupTopicsJson.add(groupTopicJson);
+    }
+
+    String groupCity = "group_city" + 
ThreadLocalRandom.current().nextInt(1000);
+    groupJson.put("group_city", groupCity);
+    json.put("group_city", groupCity);
+
+    String groupCountry = "group_country" + 
ThreadLocalRandom.current().nextInt(100);
+    groupJson.put("group_country", groupCountry);
+    json.put("group_country", groupCountry);
+
+    long groupId = Math.abs(ThreadLocalRandom.current().nextLong());
+    groupJson.put("group_id", groupId);
+    json.put("group_id", groupId);
+
+    String groupName = "group_name" + ThreadLocalRandom.current().nextInt();
+    groupJson.put("group_name", groupName);
+    json.put("group_name", groupName);
+
+    double groupLat = ThreadLocalRandom.current().nextDouble(-90.0, 90.0);
+    groupJson.put("group_lat", groupLat);
+    json.put("group_lat", groupLat);
+
+    double groupLon = ThreadLocalRandom.current().nextDouble(-90.0, 90.0);
+    groupJson.put("group_lon", groupLon);
+    json.put("group_lon", groupLon);
+
     json.put("mtime", DateTime.now().getMillis());
+
+    json.put("rsvp_id", ThreadLocalRandom.current().nextLong());
+    json.put("guests", ThreadLocalRandom.current().nextInt(100));
+
     json.put("rsvp_count", 1);
     return new RSVP(eventId, eventId, json);
   }
@@ -91,6 +137,7 @@ public class RsvpSourceGenerator implements 
PinotSourceDataGenerator {
   public void close()
       throws Exception {
   }
+
   public enum KeyColumn {
     NONE,
     EVENT_ID,
diff --git 
a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
index cb5d690628..94da8849f2 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
@@ -29,7 +29,33 @@
       "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
     }
   },
+  "fieldConfigList": [
+    {
+      "name": "ts",
+      "encodingType": "DICTIONARY",
+      "indexTypes": ["TIMESTAMP"],
+      "timestampConfig": {
+        "granularities": [
+          "DAY",
+          "WEEK",
+          "MONTH"
+        ]
+      }
+    }
+  ],
   "metadata": {
     "customConfigs": {}
+  },
+  "ingestionConfig": {
+    "transformConfigs": [
+      {
+        "columnName": "ts",
+        "transformFunction": "fromEpochDays(DaysSinceEpoch)"
+      },
+      {
+        "columnName": "tsRaw",
+        "transformFunction": "fromEpochDays(DaysSinceEpoch)"
+      }
+    ]
   }
 }
diff --git 
a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
index 97757af519..563e5caa19 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
@@ -330,6 +330,18 @@
       "dataType": "INT",
       "format": "1:DAYS:EPOCH",
       "granularity": "1:DAYS"
+    },
+    {
+      "name": "ts",
+      "dataType": "TIMESTAMP",
+      "format": "1:MILLISECONDS:TIMESTAMP",
+      "granularity": "1:SECONDS"
+    },
+    {
+      "name": "tsRaw",
+      "dataType": "TIMESTAMP",
+      "format": "1:MILLISECONDS:TIMESTAMP",
+      "granularity": "1:SECONDS"
     }
   ],
   "schemaName": "airlineStats"
diff --git 
a/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.avro
 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.avro
similarity index 100%
rename from 
pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.avro
rename to 
pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.avro
diff --git 
a/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.json
 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.json
similarity index 100%
rename from 
pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.json
rename to 
pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.json
diff --git 
a/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.orc
 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.orc
similarity index 100%
rename from 
pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.orc
rename to 
pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.orc
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
index 6742b2177b..e2420e12ab 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
@@ -8,9 +8,25 @@
     "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
     "schemaName": "meetupRsvp",
     "replication": "1",
-    "replicasPerPartition": "1"
+    "replicasPerPartition": "1",
+    "retentionTimeUnit": "DAYS",
+    "retentionTimeValue": "1"
   },
   "tenants": {},
+  "fieldConfigList": [
+    {
+      "name": "mtime",
+      "encodingType": "DICTIONARY",
+      "indexTypes": ["TIMESTAMP"],
+      "timestampConfig": {
+        "granularities": [
+          "DAY",
+          "WEEK",
+          "MONTH"
+        ]
+      }
+    }
+  ],
   "tableIndexConfig": {
     "loadMode": "MMAP",
     "streamConfigs": {
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json
index 1a144425a2..41379c2569 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json
@@ -8,9 +8,25 @@
     "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
     "schemaName": "meetupRsvp",
     "replication": "1",
-    "replicasPerPartition": "1"
+    "replicasPerPartition": "1",
+    "retentionTimeUnit": "DAYS",
+    "retentionTimeValue": "1"
   },
   "tenants": {},
+  "fieldConfigList": [
+    {
+      "name": "mtime",
+      "encodingType": "DICTIONARY",
+      "indexTypes": ["TIMESTAMP"],
+      "timestampConfig": {
+        "granularities": [
+          "DAY",
+          "WEEK",
+          "MONTH"
+        ]
+      }
+    }
+  ],
   "tableIndexConfig": {
     "loadMode": "MMAP",
     "streamConfigs": {
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_realtime_table_config.json
similarity index 57%
copy from 
pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json
copy to 
pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_realtime_table_config.json
index 7b20d696e1..43662de8fd 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_realtime_table_config.json
@@ -1,5 +1,5 @@
 {
-  "tableName": "meetupRsvp",
+  "tableName": "meetupRsvpComplexType",
   "tableType": "REALTIME",
   "tenants": {},
   "segmentsConfig": {
@@ -16,7 +16,7 @@
         {
           "streamType": "kafka",
           "stream.kafka.consumer.type": "lowLevel",
-          "stream.kafka.topic.name": "meetupRSVPEvents",
+          "stream.kafka.topic.name": "meetupRSVPComplexTypeEvents",
           "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
           "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
           "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
@@ -26,38 +26,15 @@
       ]
     },
     "transformConfigs": [
-      {
-        "columnName": "event_json",
-        "transformFunction": "jsonFormat(event)"
-      },
-      {
-        "columnName": "group_json",
-        "transformFunction": "jsonFormat(\"group\")"
-      },
-      {
-        "columnName": "member_json",
-        "transformFunction": "jsonFormat(member)"
-      },
-      {
-        "columnName": "venue_json",
-        "transformFunction": "jsonFormat(venue)"
-      }
-    ]
+    ],
+    "complexTypeConfig": {
+      "fieldsToUnnest": [
+        "group.group_topics"
+      ]
+    }
   },
   "tableIndexConfig": {
-    "loadMode": "MMAP",
-    "noDictionaryColumns": [
-      "event_json",
-      "group_json",
-      "member_json",
-      "venue_json"
-    ],
-    "jsonIndexColumns": [
-      "event_json",
-      "group_json",
-      "member_json",
-      "venue_json"
-    ]
+    "loadMode": "MMAP"
   },
   "metadata": {
     "customConfigs": {}
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_schema.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_schema.json
new file mode 100644
index 0000000000..0463c01cc1
--- /dev/null
+++ 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_schema.json
@@ -0,0 +1,63 @@
+{
+  "schemaName": "meetupRsvpComplexType",
+  "dimensionFieldSpecs": [
+    {
+      "name": "group.group_topics.urlkey",
+      "dataType": "STRING"
+    },
+    {
+      "name": "group.group_topics.topic_name",
+      "dataType": "STRING"
+    },
+    {
+      "name": "group.group_city",
+      "dataType": "STRING"
+    },
+    {
+      "name": "group.group_country",
+      "dataType": "STRING"
+    },
+    {
+      "name": "group.group_id",
+      "dataType": "INT"
+    },
+    {
+      "name": "group.group_name",
+      "dataType": "STRING"
+    },
+    {
+      "name": "group.group_lon",
+      "dataType": "DOUBLE"
+    },
+    {
+      "name": "group.group_urlname",
+      "dataType": "STRING"
+    },
+    {
+      "name": "group.group_state",
+      "dataType": "STRING"
+    },
+    {
+      "name": "group.group_lat",
+      "dataType": "DOUBLE"
+    },
+    {
+      "name": "rsvp_id",
+      "dataType": "LONG"
+    }
+  ],
+  "metricFieldSpecs": [
+    {
+      "name": "guests",
+      "dataType": "INT"
+    }
+  ],
+  "dateTimeFieldSpecs": [
+    {
+      "name": "mtime",
+      "dataType": "LONG",
+      "format": "1:MILLISECONDS:EPOCH",
+      "granularity": "1:MILLISECONDS"
+    }
+  ]
+}
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_realtime_table_config.json
similarity index 94%
rename from 
pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json
rename to 
pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_realtime_table_config.json
index 7b20d696e1..6ab8dd4f36 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_realtime_table_config.json
@@ -1,5 +1,5 @@
 {
-  "tableName": "meetupRsvp",
+  "tableName": "meetupRsvpJson",
   "tableType": "REALTIME",
   "tenants": {},
   "segmentsConfig": {
@@ -16,7 +16,7 @@
         {
           "streamType": "kafka",
           "stream.kafka.consumer.type": "lowLevel",
-          "stream.kafka.topic.name": "meetupRSVPEvents",
+          "stream.kafka.topic.name": "meetupRSVPJsonEvents",
           "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
           "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
           "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_schema.json
similarity index 95%
rename from 
pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json
rename to 
pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_schema.json
index afeee95f6b..23753ffb02 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_schema.json
@@ -1,5 +1,5 @@
 {
-  "schemaName": "meetupRsvp",
+  "schemaName": "meetupRsvpJson",
   "dimensionFieldSpecs": [
     {
       "name": "event_json",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to