This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e6c2f5bd3b Refactor upsert quickstarts (#9246) e6c2f5bd3b is described below commit e6c2f5bd3ba02d87a329fde99882a6c01d69894b Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Fri Aug 19 14:42:06 2022 -0700 Refactor upsert quickstarts (#9246) --- .../pinot/tools/PartialUpsertQuickStart.java | 75 +++---------------- .../org/apache/pinot/tools/QuickStartBase.java | 83 +++++++++++++++++++--- .../apache/pinot/tools/UpsertJsonQuickStart.java | 63 ++-------------- .../org/apache/pinot/tools/UpsertQuickStart.java | 59 ++------------- .../pinot/tools/streams/RsvpSourceGenerator.java | 6 +- ...eHandling_meetupRsvp_realtime_table_config.json | 42 ----------- .../complexTypeHandling_meetupRsvp_schema.json | 63 ---------------- ...psertJsonMeetupRsvp_realtime_table_config.json} | 4 +- .../upsertJsonMeetupRsvp_schema.json} | 2 +- .../upsertMeetupRsvp_realtime_table_config.json} | 4 +- .../upsertMeetupRsvp_schema.json} | 2 +- ...rtPartialMeetupRsvp_realtime_table_config.json} | 4 +- .../upsertPartialMeetupRsvp_schema.json} | 2 +- 13 files changed, 108 insertions(+), 301 deletions(-) diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java index a581d5c89a..2a35478ad5 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java @@ -18,21 +18,14 @@ */ package org.apache.pinot.tools; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import java.io.File; -import java.net.URL; import java.util.Arrays; import java.util.List; -import org.apache.commons.io.FileUtils; import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.command.QuickstartRunner; -import org.apache.pinot.tools.streams.MeetupRsvpStream; -import org.apache.pinot.tools.utils.KafkaStarterUtils; -public class PartialUpsertQuickStart extends QuickStartBase { +public class PartialUpsertQuickStart extends RealtimeQuickStart { public static void main(String[] args) throws Exception { @@ -41,64 +34,15 @@ public class PartialUpsertQuickStart extends QuickStartBase { } @Override - public List<String> types() { - return Arrays.asList("PARTIAL-UPSERT", "PARTIAL_UPSERT"); - } - - // Todo: add a quick start demo - public void execute() + public void runSampleQueries(QuickstartRunner runner) throws Exception { - File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis())); - File bootstrapTableDir = new File(quickstartTmpDir, "meetupRsvp"); - File dataDir = new File(bootstrapTableDir, "data"); - Preconditions.checkState(dataDir.mkdirs()); - - File schemaFile = new File(bootstrapTableDir, "meetupRsvp_schema.json"); - File tableConfigFile = new File(bootstrapTableDir, "meetupRsvp_realtime_table_config.json"); - - ClassLoader classLoader = Quickstart.class.getClassLoader(); - URL resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, schemaFile); - resource = - classLoader.getResource("examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, tableConfigFile); - - QuickstartTableRequest request = new QuickstartTableRequest(bootstrapTableDir.getAbsolutePath()); - final QuickstartRunner runner = - new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, getConfigOverrides()); - - startKafka(); - _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); - printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); - MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true); - meetupRSVPProvider.run(); - printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****"); - runner.startAll(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - printStatus(Color.GREEN, "***** Shutting down realtime quick start *****"); - runner.stop(); - meetupRSVPProvider.stopPublishing(); - FileUtils.deleteDirectory(quickstartTmpDir); - } catch (Exception e) { - e.printStackTrace(); - } - })); - printStatus(Color.CYAN, "***** Bootstrap meetupRSVP(upsert) table *****"); - runner.bootstrapTable(); - printStatus(Color.CYAN, "***** Waiting for 15 seconds for a few events to get populated *****"); - Thread.sleep(15000); - - printStatus(Color.YELLOW, "***** Upsert quickstart setup complete *****"); - // The expected behavior for total number of documents per PK should be 1. // The expected behavior for total number of rsvp_counts per PK should >=1 since it's incremented and updated. + // The expected behavior for nums of values in group_name fields should equals to rsvp_counts. String q1 = - "select event_id, count(*), sum(rsvp_count) from meetupRsvp group by event_id order by sum(rsvp_count) desc " - + "limit 10"; + "select event_id, count(*), sum(rsvp_count) from upsertPartialMeetupRsvp group by event_id order by sum" + + "(rsvp_count) desc limit 10"; printStatus(Color.YELLOW, "Total number of documents, total number of rsvp_counts per event_id in the table"); printStatus(Color.YELLOW, "***** The expected behavior for total number of documents per PK should be 1 *****"); printStatus(Color.YELLOW, @@ -112,8 +56,8 @@ public class PartialUpsertQuickStart extends QuickStartBase { // The expected behavior for nums of values in group_name fields should equals to rsvp_counts. String q2 = - "select event_id, group_name, venue_name, rsvp_count from meetupRsvp where rsvp_count > 1 order by rsvp_count" - + " desc limit 10"; + "select event_id, group_name, venue_name, rsvp_count from upsertPartialMeetupRsvp where rsvp_count > 1 order " + + "by rsvp_count desc limit 10"; printStatus(Color.YELLOW, "Event_id, group_name, venue_name, rsvp_count per per event_id in the table"); printStatus(Color.YELLOW, "***** Nums of values in group_name fields should less than or equals to rsvp_count. Duplicate records are " @@ -123,7 +67,10 @@ public class PartialUpsertQuickStart extends QuickStartBase { printStatus(Color.CYAN, "Query : " + q2); printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q2))); printStatus(Color.GREEN, "***************************************************"); + } - printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console"); + @Override + public List<String> types() { + return Arrays.asList("PARTIAL-UPSERT", "PARTIAL_UPSERT"); } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java index bab8c768cb..efd2510c21 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java @@ -19,6 +19,7 @@ package org.apache.pinot.tools; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import java.io.File; @@ -41,6 +42,7 @@ import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.admin.command.QuickstartRunner; import org.apache.pinot.tools.streams.AirlineDataStream; import org.apache.pinot.tools.streams.MeetupRsvpStream; +import org.apache.pinot.tools.streams.RsvpSourceGenerator; import org.apache.pinot.tools.utils.JarUtils; import org.apache.pinot.tools.utils.KafkaStarterUtils; import org.apache.pinot.tools.utils.PinotConfigUtils; @@ -73,12 +75,16 @@ public abstract class QuickStartBase { "examples/batch/githubComplexTypeEvents" }; - protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES = ImmutableMap.of( - "airlineStats", "examples/stream/airlineStats", - "githubEvents", "examples/minions/stream/githubEvents", - "meetupRsvp", "examples/stream/meetupRsvp", - "meetupRsvpJson", "examples/stream/meetupRsvpJson", - "meetupRsvpComplexType", "examples/stream/meetupRsvpComplexType"); + protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES = ImmutableMap.<String, String>builder() + .put("airlineStats", "examples/stream/airlineStats") + .put("githubEvents", "examples/minions/stream/githubEvents") + .put("meetupRsvp", "examples/stream/meetupRsvp") + .put("meetupRsvpJson", "examples/stream/meetupRsvpJson") + .put("meetupRsvpComplexType", "examples/stream/meetupRsvpComplexType") + .put("upsertMeetupRsvp", "examples/stream/upsertMeetupRsvp") + .put("upsertJsonMeetupRsvp", "examples/stream/upsertJsonMeetupRsvp") + .put("upsertPartialMeetupRsvp", "examples/stream/upsertPartialMeetupRsvp") + .build(); protected File _dataDir = FileUtils.getTempDirectory(); protected String[] _bootstrapDataDirs; @@ -270,14 +276,14 @@ public abstract class QuickStartBase { JsonNode columns = response.get("resultTable").get("dataSchema").get("columnNames"); int numColumns = columns.size(); for (int i = 0; i < numColumns; i++) { - responseBuilder.append(columns.get(i).asText()).append(TAB); + responseBuilder.append(jsonNode2String(columns.get(i))).append(TAB); } responseBuilder.append(NEW_LINE); JsonNode rows = response.get("resultTable").get("rows"); for (int i = 0; i < rows.size(); i++) { JsonNode row = rows.get(i); for (int j = 0; j < numColumns; j++) { - responseBuilder.append(row.get(j).asText()).append(TAB); + responseBuilder.append(jsonNode2String(row.get(j))).append(TAB); } responseBuilder.append(NEW_LINE); } @@ -285,6 +291,22 @@ public abstract class QuickStartBase { return responseBuilder.toString(); } + private static String jsonNode2String(JsonNode jsonNode) { + if (jsonNode instanceof ArrayNode) { + ArrayNode arrayNode = (ArrayNode) jsonNode; + String result = "["; + for (int i = 0; i < arrayNode.size() - 1; i++) { + result += jsonNode2String(arrayNode.get(i)) + ", "; + } + if (arrayNode.size() > 0) { + result += jsonNode2String(arrayNode.get(arrayNode.size() - 1)); + } + result += "]"; + return result; + } + return jsonNode.asText(); + } + protected Map<String, String> getDefaultStreamTableDirectories() { return DEFAULT_STREAM_TABLE_DIRECTORIES; } @@ -400,6 +422,51 @@ public abstract class QuickStartBase { } })); break; + case "upsertMeetupRsvp": + kafkaStarter.createTopic("upsertMeetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); + printStatus(Quickstart.Color.CYAN, + "***** Starting upsertMeetupRSVPEvents data stream and publishing to Kafka *****"); + MeetupRsvpStream upsertMeetupRsvpProvider = + new MeetupRsvpStream("upsertMeetupRSVPEvents", RsvpSourceGenerator.KeyColumn.EVENT_ID); + upsertMeetupRsvpProvider.run(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + upsertMeetupRsvpProvider.stopPublishing(); + } catch (Exception e) { + e.printStackTrace(); + } + })); + break; + case "upsertJsonMeetupRsvp": + kafkaStarter.createTopic("upsertJsonMeetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); + printStatus(Quickstart.Color.CYAN, + "***** Starting upsertJsonMeetupRSVPEvents data stream and publishing to Kafka *****"); + MeetupRsvpStream upsertJsonMeetupRsvpProvider = + new MeetupRsvpStream("upsertJsonMeetupRSVPEvents", RsvpSourceGenerator.KeyColumn.RSVP_ID); + upsertJsonMeetupRsvpProvider.run(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + upsertJsonMeetupRsvpProvider.stopPublishing(); + } catch (Exception e) { + e.printStackTrace(); + } + })); + break; + case "upsertPartialMeetupRsvp": + kafkaStarter.createTopic("upsertPartialMeetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); + printStatus(Quickstart.Color.CYAN, + "***** Starting upsertPartialMeetupRSVPEvents data stream and publishing to Kafka *****"); + MeetupRsvpStream upsertPartialMeetupRsvpProvider = + new MeetupRsvpStream("upsertPartialMeetupRSVPEvents", RsvpSourceGenerator.KeyColumn.EVENT_ID); + upsertPartialMeetupRsvpProvider.run(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + upsertPartialMeetupRsvpProvider.stopPublishing(); + } catch (Exception e) { + e.printStackTrace(); + } + })); + break; case "githubEvents": kafkaStarter.createTopic("githubEvents", KafkaStarterUtils.getTopicCreationProps(2)); printStatus(Quickstart.Color.CYAN, "***** Starting githubEvents data stream and publishing to Kafka *****"); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java index 22af89ff27..27d4285c76 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java @@ -18,23 +18,15 @@ */ package org.apache.pinot.tools; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import java.io.File; -import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.commons.io.FileUtils; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.PinotAdministrator; import org.apache.pinot.tools.admin.command.QuickstartRunner; -import org.apache.pinot.tools.streams.MeetupRsvpStream; -import org.apache.pinot.tools.streams.RsvpSourceGenerator; -import org.apache.pinot.tools.utils.KafkaStarterUtils; -public class UpsertJsonQuickStart extends QuickStartBase { +public class UpsertJsonQuickStart extends RealtimeQuickStart { @Override public List<String> types() { return Arrays.asList("UPSERT_JSON_INDEX", "UPSERT-JSON-INDEX"); @@ -48,59 +40,16 @@ public class UpsertJsonQuickStart extends QuickStartBase { PinotAdministrator.main(arguments.toArray(new String[0])); } - public void execute() + @Override + public void runSampleQueries(QuickstartRunner runner) throws Exception { - File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); - File baseDir = new File(quickstartTmpDir, "meetupRsvp"); - File dataDir = new File(baseDir, "data"); - Preconditions.checkState(dataDir.mkdirs()); - - File schemaFile = new File(baseDir, "meetupRsvp_schema.json"); - File tableConfigFile = new File(baseDir, "meetupRsvp_realtime_table_config.json"); - - ClassLoader classLoader = Quickstart.class.getClassLoader(); - URL resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, schemaFile); - resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, tableConfigFile); - - QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); - QuickstartRunner runner = - new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, getConfigOverrides()); - - startKafka(); - _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); - printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); - MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(RsvpSourceGenerator.KeyColumn.RSVP_ID); - meetupRSVPProvider.run(); - printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****"); - runner.startAll(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - printStatus(Color.GREEN, "***** Shutting down realtime quick start *****"); - runner.stop(); - meetupRSVPProvider.stopPublishing(); - FileUtils.deleteDirectory(quickstartTmpDir); - } catch (Exception e) { - e.printStackTrace(); - } - })); - printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****"); - runner.bootstrapTable(); - printStatus(Color.CYAN, "***** Waiting for 20 seconds for a few events to get populated *****"); - Thread.sleep(20000); - printStatus(Color.YELLOW, "***** Upsert json-index quickstart setup complete *****"); - - String q1 = "select json_extract_scalar(event_json, '$.event_name', 'STRING') from meetupRsvp where json_match" - + "(group_json, '\"$.group_topics[*].topic_name\"=''Fitness''') limit 10"; + String q1 = + "select json_extract_scalar(event_json, '$.event_name', 'STRING') from upsertJsonMeetupRsvp where json_match" + + "(group_json, '\"$.group_topics[*].topic_name\"=''topic_name0''') limit 10"; printStatus(Color.YELLOW, "Events related to fitness"); printStatus(Color.CYAN, "Query : " + q1); printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1))); - printStatus(Color.GREEN, "***************************************************"); - printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console"); } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java index d67bbcdaf6..f8dfb3953b 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java @@ -18,23 +18,16 @@ */ package org.apache.pinot.tools; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import java.io.File; -import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.apache.commons.io.FileUtils; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.PinotAdministrator; import org.apache.pinot.tools.admin.command.QuickstartRunner; -import org.apache.pinot.tools.streams.MeetupRsvpStream; -import org.apache.pinot.tools.utils.KafkaStarterUtils; -public class UpsertQuickStart extends QuickStartBase { +public class UpsertQuickStart extends RealtimeQuickStart { @Override public List<String> types() { return Collections.singletonList("UPSERT"); @@ -48,58 +41,14 @@ public class UpsertQuickStart extends QuickStartBase { PinotAdministrator.main(arguments.toArray(new String[arguments.size()])); } - public void execute() + @Override + public void runSampleQueries(QuickstartRunner runner) throws Exception { - File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); - File bootstrapTableDir = new File(quickstartTmpDir, "meetupRsvp"); - File dataDir = new File(bootstrapTableDir, "data"); - Preconditions.checkState(dataDir.mkdirs()); - - File schemaFile = new File(bootstrapTableDir, "meetupRsvp_schema.json"); - File tableConfigFile = new File(bootstrapTableDir, "meetupRsvp_realtime_table_config.json"); - - ClassLoader classLoader = Quickstart.class.getClassLoader(); - URL resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json"); - com.google.common.base.Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, schemaFile); - resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json"); - com.google.common.base.Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, tableConfigFile); - - QuickstartTableRequest request = new QuickstartTableRequest(bootstrapTableDir.getAbsolutePath()); - final QuickstartRunner runner - = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, getConfigOverrides()); - - startKafka(); - _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); - printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); - MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true); - meetupRSVPProvider.run(); - printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****"); - runner.startAll(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - printStatus(Color.GREEN, "***** Shutting down realtime quick start *****"); - runner.stop(); - meetupRSVPProvider.stopPublishing(); - FileUtils.deleteDirectory(quickstartTmpDir); - } catch (Exception e) { - e.printStackTrace(); - } - })); - printStatus(Color.CYAN, "***** Bootstrap meetupRSVP(upsert) table *****"); - runner.bootstrapTable(); - printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****"); - Thread.sleep(5000); - printStatus(Color.YELLOW, "***** Upsert quickstart setup complete *****"); - - String q1 = "select event_id, count(*) from meetupRsvp group by event_id limit 10"; + String q1 = "select event_id, count(*) from upsertMeetupRsvp group by event_id limit 10"; printStatus(Color.YELLOW, "Total number of documents per event_id in the table"); printStatus(Color.CYAN, "Query : " + q1); printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1))); printStatus(Color.GREEN, "***************************************************"); - - printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console"); } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java index cf04a9acbe..a1474d18f5 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java @@ -49,7 +49,7 @@ public class RsvpSourceGenerator implements PinotSourceDataGenerator { } public RSVP createMessage() { - String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + ""; + String eventId = Math.abs(ThreadLocalRandom.current().nextLong(100)) + ""; ObjectNode json = JsonUtils.newObjectNode(); ObjectNode eventJson = JsonUtils.newObjectNode(); json.set("event", eventJson); @@ -104,10 +104,10 @@ public class RsvpSourceGenerator implements PinotSourceDataGenerator { json.put("mtime", DateTime.now().getMillis()); - json.put("rsvp_id", ThreadLocalRandom.current().nextLong()); + json.put("rsvp_id", ThreadLocalRandom.current().nextLong(100)); json.put("guests", ThreadLocalRandom.current().nextInt(100)); - json.put("rsvp_count", 1); + json.put("rsvp_count", ThreadLocalRandom.current().nextInt(10) + 1); return new RSVP(eventId, eventId, json); } diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json deleted file mode 100644 index b23f595fba..0000000000 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json +++ /dev/null @@ -1,42 +0,0 @@ -{ - "tableName": "meetupRsvp", - "tableType": "REALTIME", - "tenants": {}, - "segmentsConfig": { - "timeColumnName": "mtime", - "timeType": "MILLISECONDS", - "segmentPushType": "APPEND", - "replicasPerPartition": "1", - "retentionTimeUnit": "DAYS", - "retentionTimeValue": "1" - }, - "ingestionConfig": { - "streamIngestionConfig": { - "streamConfigMaps": [ - { - "streamType": "kafka", - "stream.kafka.consumer.type": "lowLevel", - "stream.kafka.topic.name": "meetupRSVPEvents", - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", - "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka", - "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", - "stream.kafka.zk.broker.url": "localhost:2191/kafka", - "stream.kafka.broker.list": "localhost:19092" - } - ] - }, - "transformConfigs": [ - ], - "complexTypeConfig": { - "fieldsToUnnest": [ - "group.group_topics" - ] - } - }, - "tableIndexConfig": { - "loadMode": "MMAP" - }, - "metadata": { - "customConfigs": {} - } -} diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json deleted file mode 100644 index 7259f30204..0000000000 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json +++ /dev/null @@ -1,63 +0,0 @@ -{ - "schemaName": "meetupRsvp", - "dimensionFieldSpecs": [ - { - "name": "group.group_topics.urlkey", - "dataType": "STRING" - }, - { - "name": "group.group_topics.topic_name", - "dataType": "STRING" - }, - { - "name": "group.group_city", - "dataType": "STRING" - }, - { - "name": "group.group_country", - "dataType": "STRING" - }, - { - "name": "group.group_id", - "dataType": "INT" - }, - { - "name": "group.group_name", - "dataType": "STRING" - }, - { - "name": "group.group_lon", - "dataType": "DOUBLE" - }, - { - "name": "group.group_urlname", - "dataType": "STRING" - }, - { - "name": "group.group_state", - "dataType": "STRING" - }, - { - "name": "group.group_lat", - "dataType": "DOUBLE" - }, - { - "name": "rsvp_id", - "dataType": "LONG" - } - ], - "metricFieldSpecs": [ - { - "name": "guests", - "dataType": "INT" - } - ], - "dateTimeFieldSpecs": [ - { - "name": "mtime", - "dataType": "LONG", - "format": "1:MILLISECONDS:EPOCH", - "granularity": "1:MILLISECONDS" - } - ] -} diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_realtime_table_config.json similarity index 95% rename from pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json rename to pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_realtime_table_config.json index 149587ee21..baa77af751 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json +++ b/pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_realtime_table_config.json @@ -1,5 +1,5 @@ { - "tableName": "meetupRsvp", + "tableName": "upsertJsonMeetupRsvp", "tableType": "REALTIME", "tenants": {}, "segmentsConfig": { @@ -16,7 +16,7 @@ { "streamType": "kafka", "stream.kafka.consumer.type": "lowLevel", - "stream.kafka.topic.name": "meetupRSVPEvents", + "stream.kafka.topic.name": "upsertJsonMeetupRSVPEvents", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_schema.json similarity index 95% rename from pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json rename to pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_schema.json index 9112c73b3c..7085802479 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json +++ b/pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_schema.json @@ -1,5 +1,5 @@ { - "schemaName": "meetupRsvp", + "schemaName": "upsertJsonMeetupRsvp", "dimensionFieldSpecs": [ { "name": "event_json", diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json similarity index 94% rename from pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json rename to pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json index aecdde0d28..d29431c683 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json +++ b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json @@ -1,5 +1,5 @@ { - "tableName": "meetupRsvp", + "tableName": "upsertMeetupRsvp", "tableType": "REALTIME", "segmentsConfig": { "timeColumnName": "mtime", @@ -21,7 +21,7 @@ "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "lowLevel", - "stream.kafka.topic.name": "meetupRSVPEvents", + "stream.kafka.topic.name": "upsertMeetupRSVPEvents", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json similarity index 97% rename from pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json rename to pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json index 4c88f5d945..bb69cc2093 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json +++ b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json @@ -60,7 +60,7 @@ "granularity": "1:MILLISECONDS" } ], - "schemaName": "meetupRsvp", + "schemaName": "upsertMeetupRsvp", "primaryKeyColumns": [ "event_id" ] diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_realtime_table_config.json similarity index 93% rename from pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json rename to pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_realtime_table_config.json index 647c6f4080..2ebf3bb379 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json +++ b/pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_realtime_table_config.json @@ -1,5 +1,5 @@ { - "tableName": "meetupRsvp", + "tableName": "upsertPartialMeetupRsvp", "tableType": "REALTIME", "segmentsConfig": { "timeColumnName": "mtime", @@ -18,7 +18,7 @@ "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "lowLevel", - "stream.kafka.topic.name": "meetupRSVPEvents", + "stream.kafka.topic.name": "upsertPartialMeetupRSVPEvents", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_schema.json similarity index 96% rename from pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json rename to pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_schema.json index ba144ce7d5..b17d454b0d 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json +++ b/pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_schema.json @@ -62,7 +62,7 @@ "granularity": "1:MILLISECONDS" } ], - "schemaName": "meetupRsvp", + "schemaName": "upsertPartialMeetupRsvp", "primaryKeyColumns": [ "event_id" ] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org