This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e6c2f5bd3b Refactor upsert quickstarts (#9246)
e6c2f5bd3b is described below

commit e6c2f5bd3ba02d87a329fde99882a6c01d69894b
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Fri Aug 19 14:42:06 2022 -0700

    Refactor upsert quickstarts (#9246)
---
 .../pinot/tools/PartialUpsertQuickStart.java       | 75 +++----------------
 .../org/apache/pinot/tools/QuickStartBase.java     | 83 +++++++++++++++++++---
 .../apache/pinot/tools/UpsertJsonQuickStart.java   | 63 ++--------------
 .../org/apache/pinot/tools/UpsertQuickStart.java   | 59 ++-------------
 .../pinot/tools/streams/RsvpSourceGenerator.java   |  6 +-
 ...eHandling_meetupRsvp_realtime_table_config.json | 42 -----------
 .../complexTypeHandling_meetupRsvp_schema.json     | 63 ----------------
 ...psertJsonMeetupRsvp_realtime_table_config.json} |  4 +-
 .../upsertJsonMeetupRsvp_schema.json}              |  2 +-
 .../upsertMeetupRsvp_realtime_table_config.json}   |  4 +-
 .../upsertMeetupRsvp_schema.json}                  |  2 +-
 ...rtPartialMeetupRsvp_realtime_table_config.json} |  4 +-
 .../upsertPartialMeetupRsvp_schema.json}           |  2 +-
 13 files changed, 108 insertions(+), 301 deletions(-)

diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
index a581d5c89a..2a35478ad5 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
@@ -18,21 +18,14 @@
  */
 package org.apache.pinot.tools;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.net.URL;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
 
 
-public class PartialUpsertQuickStart extends QuickStartBase {
+public class PartialUpsertQuickStart extends RealtimeQuickStart {
 
   public static void main(String[] args)
       throws Exception {
@@ -41,64 +34,15 @@ public class PartialUpsertQuickStart extends QuickStartBase 
{
   }
 
   @Override
-  public List<String> types() {
-    return Arrays.asList("PARTIAL-UPSERT", "PARTIAL_UPSERT");
-  }
-
-  // Todo: add a quick start demo
-  public void execute()
+  public void runSampleQueries(QuickstartRunner runner)
       throws Exception {
-    File quickstartTmpDir = new File(FileUtils.getTempDirectory(), 
String.valueOf(System.currentTimeMillis()));
-    File bootstrapTableDir = new File(quickstartTmpDir, "meetupRsvp");
-    File dataDir = new File(bootstrapTableDir, "data");
-    Preconditions.checkState(dataDir.mkdirs());
-
-    File schemaFile = new File(bootstrapTableDir, "meetupRsvp_schema.json");
-    File tableConfigFile = new File(bootstrapTableDir, 
"meetupRsvp_realtime_table_config.json");
-
-    ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = 
classLoader.getResource("examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, schemaFile);
-    resource =
-        
classLoader.getResource("examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, tableConfigFile);
-
-    QuickstartTableRequest request = new 
QuickstartTableRequest(bootstrapTableDir.getAbsolutePath());
-    final QuickstartRunner runner =
-        new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, 
getConfigOverrides());
-
-    startKafka();
-    _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
-    printStatus(Color.CYAN, "***** Starting  meetup data stream and publishing 
to Kafka *****");
-    MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true);
-    meetupRSVPProvider.run();
-    printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and 
broker *****");
-    runner.startAll();
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-      try {
-        printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
-        runner.stop();
-        meetupRSVPProvider.stopPublishing();
-        FileUtils.deleteDirectory(quickstartTmpDir);
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }));
-    printStatus(Color.CYAN, "***** Bootstrap meetupRSVP(upsert) table *****");
-    runner.bootstrapTable();
-    printStatus(Color.CYAN, "***** Waiting for 15 seconds for a few events to 
get populated *****");
-    Thread.sleep(15000);
-
-    printStatus(Color.YELLOW, "***** Upsert quickstart setup complete *****");
-
     // The expected behavior for total number of documents per PK should be 1.
     // The expected behavior for total number of rsvp_counts per PK should >=1 
since it's incremented and updated.
+
     // The expected behavior for nums of values in group_name fields should 
equals to rsvp_counts.
     String q1 =
-        "select event_id, count(*), sum(rsvp_count) from meetupRsvp group by 
event_id order by sum(rsvp_count) desc "
-            + "limit 10";
+        "select event_id, count(*), sum(rsvp_count) from 
upsertPartialMeetupRsvp group by event_id order by sum"
+            + "(rsvp_count) desc limit 10";
     printStatus(Color.YELLOW, "Total number of documents, total number of 
rsvp_counts per event_id in the table");
     printStatus(Color.YELLOW, "***** The expected behavior for total number of 
documents per PK should be 1 *****");
     printStatus(Color.YELLOW,
@@ -112,8 +56,8 @@ public class PartialUpsertQuickStart extends QuickStartBase {
 
     // The expected behavior for nums of values in group_name fields should 
equals to rsvp_counts.
     String q2 =
-        "select event_id, group_name, venue_name, rsvp_count from meetupRsvp 
where rsvp_count > 1 order by rsvp_count"
-            + " desc limit 10";
+        "select event_id, group_name, venue_name, rsvp_count from 
upsertPartialMeetupRsvp where rsvp_count > 1 order "
+            + "by rsvp_count desc limit 10";
     printStatus(Color.YELLOW, "Event_id, group_name, venue_name, rsvp_count 
per per event_id in the table");
     printStatus(Color.YELLOW,
         "***** Nums of values in group_name fields should less than or equals 
to rsvp_count. Duplicate records are "
@@ -123,7 +67,10 @@ public class PartialUpsertQuickStart extends QuickStartBase 
{
     printStatus(Color.CYAN, "Query : " + q2);
     printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q2)));
     printStatus(Color.GREEN, 
"***************************************************");
+  }
 
-    printStatus(Color.GREEN, "You can always go to http://localhost:9000 to 
play around in the query console");
+  @Override
+  public List<String> types() {
+    return Arrays.asList("PARTIAL-UPSERT", "PARTIAL_UPSERT");
   }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
index bab8c768cb..efd2510c21 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.tools;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import java.io.File;
@@ -41,6 +42,7 @@ import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import org.apache.pinot.tools.streams.AirlineDataStream;
 import org.apache.pinot.tools.streams.MeetupRsvpStream;
+import org.apache.pinot.tools.streams.RsvpSourceGenerator;
 import org.apache.pinot.tools.utils.JarUtils;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.apache.pinot.tools.utils.PinotConfigUtils;
@@ -73,12 +75,16 @@ public abstract class QuickStartBase {
       "examples/batch/githubComplexTypeEvents"
   };
 
-  protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES 
= ImmutableMap.of(
-      "airlineStats", "examples/stream/airlineStats",
-      "githubEvents", "examples/minions/stream/githubEvents",
-      "meetupRsvp", "examples/stream/meetupRsvp",
-      "meetupRsvpJson", "examples/stream/meetupRsvpJson",
-      "meetupRsvpComplexType", "examples/stream/meetupRsvpComplexType");
+  protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES 
= ImmutableMap.<String, String>builder()
+      .put("airlineStats", "examples/stream/airlineStats")
+      .put("githubEvents", "examples/minions/stream/githubEvents")
+      .put("meetupRsvp", "examples/stream/meetupRsvp")
+      .put("meetupRsvpJson", "examples/stream/meetupRsvpJson")
+      .put("meetupRsvpComplexType", "examples/stream/meetupRsvpComplexType")
+      .put("upsertMeetupRsvp", "examples/stream/upsertMeetupRsvp")
+      .put("upsertJsonMeetupRsvp", "examples/stream/upsertJsonMeetupRsvp")
+      .put("upsertPartialMeetupRsvp", 
"examples/stream/upsertPartialMeetupRsvp")
+      .build();
 
   protected File _dataDir = FileUtils.getTempDirectory();
   protected String[] _bootstrapDataDirs;
@@ -270,14 +276,14 @@ public abstract class QuickStartBase {
       JsonNode columns = 
response.get("resultTable").get("dataSchema").get("columnNames");
       int numColumns = columns.size();
       for (int i = 0; i < numColumns; i++) {
-        responseBuilder.append(columns.get(i).asText()).append(TAB);
+        responseBuilder.append(jsonNode2String(columns.get(i))).append(TAB);
       }
       responseBuilder.append(NEW_LINE);
       JsonNode rows = response.get("resultTable").get("rows");
       for (int i = 0; i < rows.size(); i++) {
         JsonNode row = rows.get(i);
         for (int j = 0; j < numColumns; j++) {
-          responseBuilder.append(row.get(j).asText()).append(TAB);
+          responseBuilder.append(jsonNode2String(row.get(j))).append(TAB);
         }
         responseBuilder.append(NEW_LINE);
       }
@@ -285,6 +291,22 @@ public abstract class QuickStartBase {
     return responseBuilder.toString();
   }
 
+  private static String jsonNode2String(JsonNode jsonNode) {
+    if (jsonNode instanceof ArrayNode) {
+      ArrayNode arrayNode = (ArrayNode) jsonNode;
+      String result = "[";
+      for (int i = 0; i < arrayNode.size() - 1; i++) {
+        result += jsonNode2String(arrayNode.get(i)) + ", ";
+      }
+      if (arrayNode.size() > 0) {
+        result += jsonNode2String(arrayNode.get(arrayNode.size() - 1));
+      }
+      result += "]";
+      return result;
+    }
+    return jsonNode.asText();
+  }
+
   protected Map<String, String> getDefaultStreamTableDirectories() {
     return DEFAULT_STREAM_TABLE_DIRECTORIES;
   }
@@ -400,6 +422,51 @@ public abstract class QuickStartBase {
             }
           }));
           break;
+        case "upsertMeetupRsvp":
+          kafkaStarter.createTopic("upsertMeetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
+          printStatus(Quickstart.Color.CYAN,
+              "***** Starting upsertMeetupRSVPEvents data stream and 
publishing to Kafka *****");
+          MeetupRsvpStream upsertMeetupRsvpProvider =
+              new MeetupRsvpStream("upsertMeetupRSVPEvents", 
RsvpSourceGenerator.KeyColumn.EVENT_ID);
+          upsertMeetupRsvpProvider.run();
+          Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+              upsertMeetupRsvpProvider.stopPublishing();
+            } catch (Exception e) {
+              e.printStackTrace();
+            }
+          }));
+          break;
+        case "upsertJsonMeetupRsvp":
+          kafkaStarter.createTopic("upsertJsonMeetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
+          printStatus(Quickstart.Color.CYAN,
+              "***** Starting upsertJsonMeetupRSVPEvents data stream and 
publishing to Kafka *****");
+          MeetupRsvpStream upsertJsonMeetupRsvpProvider =
+              new MeetupRsvpStream("upsertJsonMeetupRSVPEvents", 
RsvpSourceGenerator.KeyColumn.RSVP_ID);
+          upsertJsonMeetupRsvpProvider.run();
+          Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+              upsertJsonMeetupRsvpProvider.stopPublishing();
+            } catch (Exception e) {
+              e.printStackTrace();
+            }
+          }));
+          break;
+        case "upsertPartialMeetupRsvp":
+          kafkaStarter.createTopic("upsertPartialMeetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
+          printStatus(Quickstart.Color.CYAN,
+              "***** Starting upsertPartialMeetupRSVPEvents data stream and 
publishing to Kafka *****");
+          MeetupRsvpStream upsertPartialMeetupRsvpProvider =
+              new MeetupRsvpStream("upsertPartialMeetupRSVPEvents", 
RsvpSourceGenerator.KeyColumn.EVENT_ID);
+          upsertPartialMeetupRsvpProvider.run();
+          Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+              upsertPartialMeetupRsvpProvider.stopPublishing();
+            } catch (Exception e) {
+              e.printStackTrace();
+            }
+          }));
+          break;
         case "githubEvents":
           kafkaStarter.createTopic("githubEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
           printStatus(Quickstart.Color.CYAN, "***** Starting githubEvents data 
stream and publishing to Kafka *****");
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
index 22af89ff27..27d4285c76 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
@@ -18,23 +18,15 @@
  */
 package org.apache.pinot.tools;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.commons.io.FileUtils;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.PinotAdministrator;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpStream;
-import org.apache.pinot.tools.streams.RsvpSourceGenerator;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
 
 
-public class UpsertJsonQuickStart extends QuickStartBase {
+public class UpsertJsonQuickStart extends RealtimeQuickStart {
   @Override
   public List<String> types() {
     return Arrays.asList("UPSERT_JSON_INDEX", "UPSERT-JSON-INDEX");
@@ -48,59 +40,16 @@ public class UpsertJsonQuickStart extends QuickStartBase {
     PinotAdministrator.main(arguments.toArray(new String[0]));
   }
 
-  public void execute()
+  @Override
+  public void runSampleQueries(QuickstartRunner runner)
       throws Exception {
-    File quickstartTmpDir = new File(_dataDir, 
String.valueOf(System.currentTimeMillis()));
-    File baseDir = new File(quickstartTmpDir, "meetupRsvp");
-    File dataDir = new File(baseDir, "data");
-    Preconditions.checkState(dataDir.mkdirs());
-
-    File schemaFile = new File(baseDir, "meetupRsvp_schema.json");
-    File tableConfigFile = new File(baseDir, 
"meetupRsvp_realtime_table_config.json");
-
-    ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = 
classLoader.getResource("examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, schemaFile);
-    resource = 
classLoader.getResource("examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json");
-    Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, tableConfigFile);
-
-    QuickstartTableRequest request = new 
QuickstartTableRequest(baseDir.getAbsolutePath());
-    QuickstartRunner runner =
-        new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, 
getConfigOverrides());
-
-    startKafka();
-    _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
-    printStatus(Color.CYAN, "***** Starting meetup data stream and publishing 
to Kafka *****");
-    MeetupRsvpStream meetupRSVPProvider = new 
MeetupRsvpStream(RsvpSourceGenerator.KeyColumn.RSVP_ID);
-    meetupRSVPProvider.run();
-    printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and 
broker *****");
-    runner.startAll();
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-      try {
-        printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
-        runner.stop();
-        meetupRSVPProvider.stopPublishing();
-        FileUtils.deleteDirectory(quickstartTmpDir);
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }));
-    printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****");
-    runner.bootstrapTable();
-    printStatus(Color.CYAN, "***** Waiting for 20 seconds for a few events to 
get populated *****");
-    Thread.sleep(20000);
-
     printStatus(Color.YELLOW, "***** Upsert json-index quickstart setup 
complete *****");
-
-    String q1 = "select json_extract_scalar(event_json, '$.event_name', 
'STRING') from meetupRsvp where json_match"
-        + "(group_json, '\"$.group_topics[*].topic_name\"=''Fitness''') limit 
10";
+    String q1 =
+        "select json_extract_scalar(event_json, '$.event_name', 'STRING') from 
upsertJsonMeetupRsvp where json_match"
+            + "(group_json, 
'\"$.group_topics[*].topic_name\"=''topic_name0''') limit 10";
     printStatus(Color.YELLOW, "Events related to fitness");
     printStatus(Color.CYAN, "Query : " + q1);
     printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
-
     printStatus(Color.GREEN, 
"***************************************************");
-    printStatus(Color.GREEN, "You can always go to http://localhost:9000 to 
play around in the query console");
   }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
index d67bbcdaf6..f8dfb3953b 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
@@ -18,23 +18,16 @@
  */
 package org.apache.pinot.tools;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import org.apache.commons.io.FileUtils;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.PinotAdministrator;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
 
 
-public class UpsertQuickStart extends QuickStartBase {
+public class UpsertQuickStart extends RealtimeQuickStart {
   @Override
   public List<String> types() {
     return Collections.singletonList("UPSERT");
@@ -48,58 +41,14 @@ public class UpsertQuickStart extends QuickStartBase {
     PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
   }
 
-  public void execute()
+  @Override
+  public void runSampleQueries(QuickstartRunner runner)
       throws Exception {
-    File quickstartTmpDir = new File(_dataDir, 
String.valueOf(System.currentTimeMillis()));
-    File bootstrapTableDir = new File(quickstartTmpDir, "meetupRsvp");
-    File dataDir = new File(bootstrapTableDir, "data");
-    Preconditions.checkState(dataDir.mkdirs());
-
-    File schemaFile = new File(bootstrapTableDir, "meetupRsvp_schema.json");
-    File tableConfigFile = new File(bootstrapTableDir, 
"meetupRsvp_realtime_table_config.json");
-
-    ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = 
classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json");
-    com.google.common.base.Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, schemaFile);
-    resource = 
classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json");
-    com.google.common.base.Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, tableConfigFile);
-
-    QuickstartTableRequest request = new 
QuickstartTableRequest(bootstrapTableDir.getAbsolutePath());
-    final QuickstartRunner runner
-        = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, 
dataDir, getConfigOverrides());
-
-    startKafka();
-    _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
-    printStatus(Color.CYAN, "***** Starting  meetup data stream and publishing 
to Kafka *****");
-    MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true);
-    meetupRSVPProvider.run();
-    printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and 
broker *****");
-    runner.startAll();
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-      try {
-        printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
-        runner.stop();
-        meetupRSVPProvider.stopPublishing();
-        FileUtils.deleteDirectory(quickstartTmpDir);
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }));
-    printStatus(Color.CYAN, "***** Bootstrap meetupRSVP(upsert) table *****");
-    runner.bootstrapTable();
-    printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to 
get populated *****");
-    Thread.sleep(5000);
-
     printStatus(Color.YELLOW, "***** Upsert quickstart setup complete *****");
-
-    String q1 = "select event_id, count(*) from meetupRsvp group by event_id 
limit 10";
+    String q1 = "select event_id, count(*) from upsertMeetupRsvp group by 
event_id limit 10";
     printStatus(Color.YELLOW, "Total number of documents per event_id in the 
table");
     printStatus(Color.CYAN, "Query : " + q1);
     printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
     printStatus(Color.GREEN, 
"***************************************************");
-
-    printStatus(Color.GREEN, "You can always go to http://localhost:9000 to 
play around in the query console");
   }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
index cf04a9acbe..a1474d18f5 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
@@ -49,7 +49,7 @@ public class RsvpSourceGenerator implements 
PinotSourceDataGenerator {
   }
 
   public RSVP createMessage() {
-    String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + "";
+    String eventId = Math.abs(ThreadLocalRandom.current().nextLong(100)) + "";
     ObjectNode json = JsonUtils.newObjectNode();
     ObjectNode eventJson = JsonUtils.newObjectNode();
     json.set("event", eventJson);
@@ -104,10 +104,10 @@ public class RsvpSourceGenerator implements 
PinotSourceDataGenerator {
 
     json.put("mtime", DateTime.now().getMillis());
 
-    json.put("rsvp_id", ThreadLocalRandom.current().nextLong());
+    json.put("rsvp_id", ThreadLocalRandom.current().nextLong(100));
     json.put("guests", ThreadLocalRandom.current().nextInt(100));
 
-    json.put("rsvp_count", 1);
+    json.put("rsvp_count", ThreadLocalRandom.current().nextInt(10) + 1);
     return new RSVP(eventId, eventId, json);
   }
 
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json
deleted file mode 100644
index b23f595fba..0000000000
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json
+++ /dev/null
@@ -1,42 +0,0 @@
-{
-  "tableName": "meetupRsvp",
-  "tableType": "REALTIME",
-  "tenants": {},
-  "segmentsConfig": {
-    "timeColumnName": "mtime",
-    "timeType": "MILLISECONDS",
-    "segmentPushType": "APPEND",
-    "replicasPerPartition": "1",
-    "retentionTimeUnit": "DAYS",
-    "retentionTimeValue": "1"
-  },
-  "ingestionConfig": {
-    "streamIngestionConfig": {
-      "streamConfigMaps": [
-        {
-          "streamType": "kafka",
-          "stream.kafka.consumer.type": "lowLevel",
-          "stream.kafka.topic.name": "meetupRSVPEvents",
-          "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
-          "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
-          "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
-          "stream.kafka.zk.broker.url": "localhost:2191/kafka",
-          "stream.kafka.broker.list": "localhost:19092"
-        }
-      ]
-    },
-    "transformConfigs": [
-    ],
-    "complexTypeConfig": {
-      "fieldsToUnnest": [
-        "group.group_topics"
-      ]
-    }
-  },
-  "tableIndexConfig": {
-    "loadMode": "MMAP"
-  },
-  "metadata": {
-    "customConfigs": {}
-  }
-}
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json
deleted file mode 100644
index 7259f30204..0000000000
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json
+++ /dev/null
@@ -1,63 +0,0 @@
-{
-  "schemaName": "meetupRsvp",
-  "dimensionFieldSpecs": [
-    {
-      "name": "group.group_topics.urlkey",
-      "dataType": "STRING"
-    },
-    {
-      "name": "group.group_topics.topic_name",
-      "dataType": "STRING"
-    },
-    {
-      "name": "group.group_city",
-      "dataType": "STRING"
-    },
-    {
-      "name": "group.group_country",
-      "dataType": "STRING"
-    },
-    {
-      "name": "group.group_id",
-      "dataType": "INT"
-    },
-    {
-      "name": "group.group_name",
-      "dataType": "STRING"
-    },
-    {
-      "name": "group.group_lon",
-      "dataType": "DOUBLE"
-    },
-    {
-      "name": "group.group_urlname",
-      "dataType": "STRING"
-    },
-    {
-      "name": "group.group_state",
-      "dataType": "STRING"
-    },
-    {
-      "name": "group.group_lat",
-      "dataType": "DOUBLE"
-    },
-    {
-      "name": "rsvp_id",
-      "dataType": "LONG"
-    }
-  ],
-  "metricFieldSpecs": [
-    {
-      "name": "guests",
-      "dataType": "INT"
-    }
-  ],
-  "dateTimeFieldSpecs": [
-    {
-      "name": "mtime",
-      "dataType": "LONG",
-      "format": "1:MILLISECONDS:EPOCH",
-      "granularity": "1:MILLISECONDS"
-    }
-  ]
-}
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_realtime_table_config.json
similarity index 95%
rename from 
pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json
rename to 
pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_realtime_table_config.json
index 149587ee21..baa77af751 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_realtime_table_config.json
@@ -1,5 +1,5 @@
 {
-  "tableName": "meetupRsvp",
+  "tableName": "upsertJsonMeetupRsvp",
   "tableType": "REALTIME",
   "tenants": {},
   "segmentsConfig": {
@@ -16,7 +16,7 @@
         {
           "streamType": "kafka",
           "stream.kafka.consumer.type": "lowLevel",
-          "stream.kafka.topic.name": "meetupRSVPEvents",
+          "stream.kafka.topic.name": "upsertJsonMeetupRSVPEvents",
           "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
           "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
           "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json
 
b/pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_schema.json
similarity index 95%
rename from 
pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json
rename to 
pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_schema.json
index 9112c73b3c..7085802479 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_schema.json
@@ -1,5 +1,5 @@
 {
-  "schemaName": "meetupRsvp",
+  "schemaName": "upsertJsonMeetupRsvp",
   "dimensionFieldSpecs": [
     {
       "name": "event_json",
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
similarity index 94%
rename from 
pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json
rename to 
pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
index aecdde0d28..d29431c683 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
@@ -1,5 +1,5 @@
 {
-  "tableName": "meetupRsvp",
+  "tableName": "upsertMeetupRsvp",
   "tableType": "REALTIME",
   "segmentsConfig": {
     "timeColumnName": "mtime",
@@ -21,7 +21,7 @@
     "streamConfigs": {
       "streamType": "kafka",
       "stream.kafka.consumer.type": "lowLevel",
-      "stream.kafka.topic.name": "meetupRSVPEvents",
+      "stream.kafka.topic.name": "upsertMeetupRSVPEvents",
       "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
       "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
       "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json
 
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
similarity index 97%
rename from 
pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json
rename to 
pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
index 4c88f5d945..bb69cc2093 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
@@ -60,7 +60,7 @@
       "granularity": "1:MILLISECONDS"
     }
   ],
-  "schemaName": "meetupRsvp",
+  "schemaName": "upsertMeetupRsvp",
   "primaryKeyColumns": [
     "event_id"
   ]
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_realtime_table_config.json
similarity index 93%
rename from 
pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
rename to 
pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_realtime_table_config.json
index 647c6f4080..2ebf3bb379 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_realtime_table_config.json
@@ -1,5 +1,5 @@
 {
-  "tableName": "meetupRsvp",
+  "tableName": "upsertPartialMeetupRsvp",
   "tableType": "REALTIME",
   "segmentsConfig": {
     "timeColumnName": "mtime",
@@ -18,7 +18,7 @@
     "streamConfigs": {
       "streamType": "kafka",
       "stream.kafka.consumer.type": "lowLevel",
-      "stream.kafka.topic.name": "meetupRSVPEvents",
+      "stream.kafka.topic.name": "upsertPartialMeetupRSVPEvents",
       "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
       "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
       "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json
 
b/pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_schema.json
similarity index 96%
rename from 
pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json
rename to 
pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_schema.json
index ba144ce7d5..b17d454b0d 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_schema.json
@@ -62,7 +62,7 @@
       "granularity": "1:MILLISECONDS"
     }
   ],
-  "schemaName": "meetupRsvp",
+  "schemaName": "upsertPartialMeetupRsvp",
   "primaryKeyColumns": [
     "event_id"
   ]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to