This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d41ec0bfd4 Refactor stream quickstart (#9227) d41ec0bfd4 is described below commit d41ec0bfd4282ff2121ecb9b1b36af48c1890c1e Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Wed Aug 17 11:24:34 2022 -0700 Refactor stream quickstart (#9227) --- .../kafka20/server/KafkaDataServerStartable.java | 31 ++++ .../org/apache/pinot/tools/GenericQuickstart.java | 29 ---- .../apache/pinot/tools/GitHubEventsQuickstart.java | 28 ++-- .../org/apache/pinot/tools/HybridQuickstart.java | 165 +++++++-------------- .../pinot/tools/PartialUpsertQuickStart.java | 18 +-- .../org/apache/pinot/tools/QuickStartBase.java | 164 ++++++++++++++++++++ .../apache/pinot/tools/QuickstartTableRequest.java | 21 +++ .../RealtimeComplexTypeHandlingQuickStart.java | 83 +---------- .../pinot/tools/RealtimeJsonIndexQuickStart.java | 92 ++---------- .../org/apache/pinot/tools/RealtimeQuickStart.java | 102 +++++-------- .../pinot/tools/RealtimeQuickStartWithMinion.java | 126 ++-------------- .../apache/pinot/tools/UpsertJsonQuickStart.java | 19 +-- .../org/apache/pinot/tools/UpsertQuickStart.java | 19 +-- .../pinot/tools/streams/AirlineDataStream.java | 13 +- .../pinot/tools/streams/MeetupRsvpStream.java | 18 ++- .../pinot/tools/streams/RsvpSourceGenerator.java | 63 +++++++- .../airlineStats_realtime_table_config.json | 26 ++++ .../stream/airlineStats/airlineStats_schema.json | 12 ++ .../airlineStats_data.avro | Bin .../airlineStats_data.json | 0 .../{sample_data => rawdata}/airlineStats_data.orc | Bin .../meetupRsvp_realtime_table_config.json | 18 ++- .../meetupRsvp_realtime_table_config.json | 18 ++- ...etupRsvpComplexType_realtime_table_config.json} | 41 ++--- .../meetupRsvpComplexType_schema.json | 63 ++++++++ .../meetupRsvpJson_realtime_table_config.json} | 4 +- .../meetupRsvpJson_schema.json} | 2 +- 27 files changed, 575 insertions(+), 600 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java index 42bbd9be8d..9bd13c43ae 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.plugin.stream.kafka20.server; +import com.google.common.base.Function; import java.io.File; import java.util.Arrays; import java.util.Collection; @@ -25,6 +26,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.UUID; +import javax.annotation.Nullable; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import org.I0Itec.zkclient.ZkClient; @@ -96,10 +98,39 @@ public class KafkaDataServerStartable implements StreamDataServerStartable { int partition = (Integer) props.get("partition"); Collection<NewTopic> topicList = Arrays.asList(new NewTopic(topic, partition, (short) 1)); _adminClient.createTopics(topicList); + waitForCondition(new Function<Void, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable Void aVoid) { + try { + return _adminClient.listTopics().names().get().contains(topic); + } catch (Exception e) { + LOGGER.warn("Could not fetch Kafka topics", e); + return null; + } + } + }, 1000L, 30000, "Kafka topic " + topic + " is not created yet"); } @Override public int getPort() { return _port; } + + private static void waitForCondition(Function<Void, Boolean> condition, long checkIntervalMs, long timeoutMs, + @Nullable String errorMessage) { + long endTime = System.currentTimeMillis() + timeoutMs; + String errorMessageSuffix = errorMessage != null ? ", error message: " + errorMessage : ""; + while (System.currentTimeMillis() < endTime) { + try { + if (Boolean.TRUE.equals(condition.apply(null))) { + return; + } + Thread.sleep(checkIntervalMs); + } catch (Exception e) { + LOGGER.error("Caught exception while checking the condition" + errorMessageSuffix, e); + } + } + LOGGER.error("Failed to meet condition in " + timeoutMs + "ms" + errorMessageSuffix); + } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java index 62fc47d6b3..b4b8f295cd 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java @@ -20,11 +20,7 @@ package org.apache.pinot.tools; import java.util.Arrays; import java.util.List; -import org.apache.pinot.common.utils.ZkStarter; -import org.apache.pinot.spi.stream.StreamDataProvider; -import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.admin.command.QuickstartRunner; -import org.apache.pinot.tools.utils.KafkaStarterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,24 +39,10 @@ import org.slf4j.LoggerFactory; */ public class GenericQuickstart extends Quickstart { private static final Logger LOGGER = LoggerFactory.getLogger(GenericQuickstart.class); - private StreamDataServerStartable _kafkaStarter; - private ZkStarter.ZookeeperInstance _zookeeperInstance; public GenericQuickstart() { } - private void startKafka() { - _zookeeperInstance = ZkStarter.startLocalZkServer(); - try { - _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, - KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance)); - } catch (Exception e) { - throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e); - } - _kafkaStarter.start(); - _kafkaStarter.createTopic("pullRequestMergedEvents", KafkaStarterUtils.getTopicCreationProps(2)); - } - @Override public List<String> types() { return Arrays.asList("GENERIC"); @@ -92,18 +74,7 @@ public class GenericQuickstart extends Quickstart { public void execute() throws Exception { - printStatus(Color.CYAN, "***** Starting Kafka *****"); startKafka(); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - printStatus(Color.GREEN, "***** Shutting down QuickStart cluster *****"); - _kafkaStarter.stop(); - ZkStarter.stopLocalZkServer(_zookeeperInstance); - } catch (Exception e) { - LOGGER.error("Caught exception in shutting down QuickStart cluster", e); - } - })); super.execute(); } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java index 47a1a9417f..8765333b96 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java @@ -26,20 +26,16 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.utils.ZkStarter; import org.apache.pinot.spi.stream.StreamDataProvider; import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.command.QuickstartRunner; import org.apache.pinot.tools.streams.githubevents.PullRequestMergedEventsStream; -import org.apache.pinot.tools.utils.KafkaStarterUtils; import org.apache.pinot.tools.utils.KinesisStarterUtils; import org.apache.pinot.tools.utils.StreamSourceType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; - /** * Sets up a demo Pinot cluster with 1 zookeeper, 1 controller, 1 broker and 1 server @@ -50,25 +46,12 @@ import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; public class GitHubEventsQuickstart extends QuickStartBase { private static final Logger LOGGER = LoggerFactory.getLogger(GitHubEventsQuickstart.class); private StreamDataServerStartable _serverStarter; - private ZkStarter.ZookeeperInstance _zookeeperInstance; private String _personalAccessToken; private StreamSourceType _sourceType; public GitHubEventsQuickstart() { } - private void startKafka() { - _zookeeperInstance = ZkStarter.startLocalZkServer(); - try { - _serverStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, - KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance)); - } catch (Exception e) { - throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e); - } - _serverStarter.start(); - _serverStarter.createTopic("pullRequestMergedEvents", KafkaStarterUtils.getTopicCreationProps(2)); - } - private void startKinesis() { try { @@ -85,6 +68,15 @@ public class GitHubEventsQuickstart extends QuickStartBase { Properties topicProperties = new Properties(); topicProperties.put(KinesisStarterUtils.NUM_SHARDS, 3); _serverStarter.createTopic("pullRequestMergedEvents", topicProperties); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + printStatus(Color.GREEN, "***** Shutting down Kinesis *****"); + _serverStarter.stop(); + } catch (Exception e) { + LOGGER.error("Caught exception in shutting down Kinesis", e); + } + })); } private void startStreamServer() { @@ -148,8 +140,6 @@ public class GitHubEventsQuickstart extends QuickStartBase { try { printStatus(Color.GREEN, "***** Shutting down GitHubEventsQuickStart *****"); runner.stop(); - _serverStarter.stop(); - ZkStarter.stopLocalZkServer(_zookeeperInstance); FileUtils.deleteDirectory(quickStartDataDir); } catch (Exception e) { LOGGER.error("Caught exception in shutting down GitHubEvents QuickStart", e); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java index 3ef0945a75..c65fa0a347 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java @@ -19,45 +19,27 @@ package org.apache.pinot.tools; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import com.google.common.collect.ImmutableMap; import java.io.File; -import java.io.IOException; -import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.utils.ZkStarter; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.stream.StreamDataProvider; -import org.apache.pinot.spi.stream.StreamDataServerStartable; -import org.apache.pinot.spi.utils.JsonUtils; -import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.PinotAdministrator; import org.apache.pinot.tools.admin.command.QuickstartRunner; -import org.apache.pinot.tools.streams.AirlineDataStream; -import org.apache.pinot.tools.utils.KafkaStarterUtils; -import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; - -public class HybridQuickstart extends QuickStartBase { +public class HybridQuickstart extends Quickstart { @Override public List<String> types() { return Collections.singletonList("HYBRID"); } - private StreamDataServerStartable _kafkaStarter; - private ZkStarter.ZookeeperInstance _zookeeperInstance; - private File _schemaFile; - private File _realtimeTableConfigFile; - private File _dataFile; - private File _ingestionJobSpecFile; - public static void main(String[] args) throws Exception { List<String> arguments = new ArrayList<>(); @@ -73,97 +55,9 @@ public class HybridQuickstart extends QuickStartBase { return overrides; } - private QuickstartTableRequest prepareTableRequest(File baseDir) - throws IOException { - _schemaFile = new File(baseDir, "airlineStats_schema.json"); - _ingestionJobSpecFile = new File(baseDir, "ingestionJobSpec.yaml"); - File tableConfigFile = new File(baseDir, "airlineStats_offline_table_config.json"); - - ClassLoader classLoader = Quickstart.class.getClassLoader(); - URL resource = classLoader.getResource("examples/batch/airlineStats/airlineStats_schema.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, _schemaFile); - resource = classLoader.getResource("examples/batch/airlineStats/ingestionJobSpec.yaml"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, _ingestionJobSpecFile); - resource = classLoader.getResource("examples/batch/airlineStats/airlineStats_offline_table_config.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, tableConfigFile); - - _realtimeTableConfigFile = new File(baseDir, "airlineStats_realtime_table_config.json"); - resource = Quickstart.class.getClassLoader() - .getResource("examples/stream/airlineStats/airlineStats_realtime_table_config.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, _realtimeTableConfigFile); - resource = Quickstart.class.getClassLoader() - .getResource("examples/stream/airlineStats/sample_data/airlineStats_data.avro"); - Preconditions.checkNotNull(resource); - _dataFile = new File(baseDir, "airlineStats_data.avro"); - FileUtils.copyURLToFile(resource, _dataFile); - - return new QuickstartTableRequest(baseDir.getAbsolutePath()); - } - - private void startKafka() { - _zookeeperInstance = ZkStarter.startLocalZkServer(); - try { - _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, - KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance)); - } catch (Exception e) { - throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e); - } - _kafkaStarter.start(); - _kafkaStarter.createTopic("flights-realtime", KafkaStarterUtils.getTopicCreationProps(10)); - } - - public void execute() + @Override + public void runSampleQueries(QuickstartRunner runner) throws Exception { - File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); - File baseDir = new File(quickstartTmpDir, "airlineStats"); - File dataDir = new File(baseDir, "data"); - Preconditions.checkState(dataDir.mkdirs()); - QuickstartTableRequest bootstrapTableRequest = prepareTableRequest(baseDir); - final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(bootstrapTableRequest), - 1, 1, 1, 1, dataDir, getConfigOverrides()); - printStatus(Color.YELLOW, "***** Starting Kafka *****"); - startKafka(); - printStatus(Color.YELLOW, "***** Starting airline data stream and publishing to Kafka *****"); - Schema schema = Schema.fromFile(_schemaFile); - TableConfig tableConfig = JsonUtils.fileToObject(_realtimeTableConfigFile, TableConfig.class); - AirlineDataStream stream = new AirlineDataStream(schema, tableConfig, _dataFile); - stream.run(); - printStatus(Color.YELLOW, "***** Starting Zookeeper, 1 servers, 1 brokers and 1 controller *****"); - runner.startAll(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - printStatus(Color.GREEN, "***** Shutting down hybrid quick start *****"); - runner.stop(); - stream.shutdown(); - _kafkaStarter.stop(); - ZkStarter.stopLocalZkServer(_zookeeperInstance); - FileUtils.deleteDirectory(quickstartTmpDir); - } catch (Exception e) { - e.printStackTrace(); - } - })); - printStatus(Color.YELLOW, "***** Bootstrap airlineStats offline and realtime table *****"); - runner.bootstrapTable(); - - printStatus(Color.YELLOW, "***** Pinot Hybrid with hybrid table setup is complete *****"); - printStatus(Color.YELLOW, "***** Sequence of operations *****"); - printStatus(Color.YELLOW, "***** 1. Started 1 controller instance where tenant creation is enabled *****"); - printStatus(Color.YELLOW, "***** 2. Started 2 servers and 2 brokers *****"); - printStatus(Color.YELLOW, "***** 3. Created a server tenant with 1 offline and 1 realtime instance *****"); - printStatus(Color.YELLOW, "***** 4. Created a broker tenant with 2 instances *****"); - printStatus(Color.YELLOW, "***** 5. Added a schema *****"); - printStatus(Color.YELLOW, - "***** 6. Created an offline and a realtime table with the tenant names created above *****"); - printStatus(Color.YELLOW, "***** 7. Built and pushed an offline segment *****"); - printStatus(Color.YELLOW, - "***** 8. Started publishing a Kafka stream for the realtime instance to start consuming *****"); - printStatus(Color.YELLOW, "***** 9. Sleep 5 Seconds to wait for all components brought up *****"); - Thread.sleep(5000); - String q1 = "select count(*) from airlineStats limit 1"; printStatus(Color.YELLOW, "Total number of documents in the table"); printStatus(Color.CYAN, "Query : " + q1); @@ -198,6 +92,53 @@ public class HybridQuickstart extends QuickStartBase { printStatus(Color.CYAN, "Query : " + q5); printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q5))); printStatus(Color.GREEN, "***************************************************"); + } + + @Override + protected String[] getDefaultBatchTableDirectories() { + return new String[]{"examples/batch/airlineStats"}; + } + + @Override + protected Map<String, String> getDefaultStreamTableDirectories() { + return ImmutableMap.of("airlineStats", "examples/stream/airlineStats"); + } + + public void execute() + throws Exception { + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); + File quickstartRunnerDir = new File(quickstartTmpDir, "quickstart"); + Preconditions.checkState(quickstartRunnerDir.mkdirs()); + Set<QuickstartTableRequest> quickstartTableRequests = new HashSet<>(); + quickstartTableRequests.addAll(bootstrapOfflineTableDirectories(quickstartTmpDir)); + quickstartTableRequests.addAll(bootstrapStreamTableDirectories(quickstartTmpDir)); + final QuickstartRunner runner = + new QuickstartRunner(new ArrayList<>(quickstartTableRequests), 1, 1, 1, 1, quickstartRunnerDir, + getConfigOverrides()); + + startKafka(); + startAllDataStreams(_kafkaStarter, quickstartTmpDir); + + printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker, server and minion *****"); + runner.startAll(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + printStatus(Color.GREEN, "***** Shutting down realtime quick start *****"); + runner.stop(); + FileUtils.deleteDirectory(quickstartTmpDir); + } catch (Exception e) { + e.printStackTrace(); + } + })); + + printStatus(Color.CYAN, "***** Bootstrap all tables *****"); + runner.bootstrapTable(); + + printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****"); + Thread.sleep(5000); + + printStatus(Color.YELLOW, "***** Realtime quickstart setup complete *****"); + runSampleQueries(runner); printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console"); } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java index 19fd63e72b..a581d5c89a 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java @@ -25,20 +25,14 @@ import java.net.URL; import java.util.Arrays; import java.util.List; import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.utils.ZkStarter; import org.apache.pinot.spi.plugin.PluginManager; -import org.apache.pinot.spi.stream.StreamDataProvider; -import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.command.QuickstartRunner; import org.apache.pinot.tools.streams.MeetupRsvpStream; import org.apache.pinot.tools.utils.KafkaStarterUtils; -import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; - public class PartialUpsertQuickStart extends QuickStartBase { - private StreamDataServerStartable _kafkaStarter; public static void main(String[] args) throws Exception { @@ -75,15 +69,7 @@ public class PartialUpsertQuickStart extends QuickStartBase { final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, getConfigOverrides()); - printStatus(Color.CYAN, "***** Starting Kafka *****"); - final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); - try { - _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, - KafkaStarterUtils.getDefaultKafkaConfiguration()); - } catch (Exception e) { - throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e); - } - _kafkaStarter.start(); + startKafka(); _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true); @@ -95,8 +81,6 @@ public class PartialUpsertQuickStart extends QuickStartBase { printStatus(Color.GREEN, "***** Shutting down realtime quick start *****"); runner.stop(); meetupRSVPProvider.stopPublishing(); - _kafkaStarter.stop(); - ZkStarter.stopLocalZkServer(zookeeperInstance); FileUtils.deleteDirectory(quickstartTmpDir); } catch (Exception e) { e.printStackTrace(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java index 225aeb40fd..bab8c768cb 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java @@ -24,15 +24,25 @@ import com.google.common.collect.ImmutableMap; import java.io.File; import java.io.IOException; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.LineIterator; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.utils.ZkStarter; +import org.apache.pinot.spi.stream.StreamDataProducer; +import org.apache.pinot.spi.stream.StreamDataProvider; +import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.admin.command.QuickstartRunner; +import org.apache.pinot.tools.streams.AirlineDataStream; +import org.apache.pinot.tools.streams.MeetupRsvpStream; import org.apache.pinot.tools.utils.JarUtils; +import org.apache.pinot.tools.utils.KafkaStarterUtils; import org.apache.pinot.tools.utils.PinotConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,10 +73,19 @@ public abstract class QuickStartBase { "examples/batch/githubComplexTypeEvents" }; + protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES = ImmutableMap.of( + "airlineStats", "examples/stream/airlineStats", + "githubEvents", "examples/minions/stream/githubEvents", + "meetupRsvp", "examples/stream/meetupRsvp", + "meetupRsvpJson", "examples/stream/meetupRsvpJson", + "meetupRsvpComplexType", "examples/stream/meetupRsvpComplexType"); + protected File _dataDir = FileUtils.getTempDirectory(); protected String[] _bootstrapDataDirs; protected String _zkExternalAddress; protected String _configFilePath; + protected StreamDataServerStartable _kafkaStarter; + protected ZkStarter.ZookeeperInstance _zookeeperInstance; public QuickStartBase setDataDir(String dataDir) { _dataDir = new File(dataDir); @@ -185,6 +204,25 @@ public abstract class QuickStartBase { } } + protected List<QuickstartTableRequest> bootstrapStreamTableDirectories(File quickstartTmpDir) + throws IOException { + List<QuickstartTableRequest> quickstartTableRequests = new ArrayList<>(); + for (Map.Entry<String, String> entry : getDefaultStreamTableDirectories().entrySet()) { + String tableName = entry.getKey(); + String directory = entry.getValue(); + File baseDir = new File(quickstartTmpDir, tableName); + File dataDir = new File(baseDir, "rawdata"); + dataDir.mkdirs(); + if (useDefaultBootstrapTableDir()) { + copyResourceTableToTmpDirectory(directory, tableName, baseDir, dataDir, true); + } else { + copyFilesystemTableToTmpDirectory(directory, tableName, baseDir); + } + quickstartTableRequests.add(new QuickstartTableRequest(baseDir.getAbsolutePath())); + } + return quickstartTableRequests; + } + private static void copyFilesystemTableToTmpDirectory(String sourcePath, String tableName, File baseDir) throws IOException { File fileDb = new File(sourcePath); @@ -246,4 +284,130 @@ public abstract class QuickStartBase { } return responseBuilder.toString(); } + + protected Map<String, String> getDefaultStreamTableDirectories() { + return DEFAULT_STREAM_TABLE_DIRECTORIES; + } + + protected static void publishStreamDataToKafka(String tableName, File dataDir) + throws Exception { + switch (tableName) { + case "githubEvents": + publishGithubEventsToKafka("githubEvents", new File(dataDir, "/rawdata/2021-07-21-few-hours.json")); + break; + default: + break; + } + } + + protected static void publishGithubEventsToKafka(String topicName, File dataFile) + throws Exception { + Properties properties = new Properties(); + properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER); + properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); + properties.put("request.required.acks", "1"); + StreamDataProducer producer = + StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); + try { + LineIterator dataStream = FileUtils.lineIterator(dataFile); + + while (dataStream.hasNext()) { + producer.produce(topicName, dataStream.nextLine().getBytes(StandardCharsets.UTF_8)); + } + } finally { + producer.close(); + } + } + + protected void startKafka() { + printStatus(Quickstart.Color.CYAN, "***** Starting Kafka *****"); + _zookeeperInstance = ZkStarter.startLocalZkServer(); + try { + _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, + KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance)); + } catch (Exception e) { + throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e); + } + _kafkaStarter.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + printStatus(Quickstart.Color.GREEN, "***** Shutting down kafka and zookeeper *****"); + _kafkaStarter.stop(); + ZkStarter.stopLocalZkServer(_zookeeperInstance); + } catch (Exception e) { + e.printStackTrace(); + } + })); + + printStatus(Quickstart.Color.CYAN, "***** Kafka Started *****"); + } + + public void startAllDataStreams(StreamDataServerStartable kafkaStarter, File quickstartTmpDir) + throws Exception { + for (String streamName : getDefaultStreamTableDirectories().keySet()) { + switch (streamName) { + case "airlineStats": + kafkaStarter.createTopic("flights-realtime", KafkaStarterUtils.getTopicCreationProps(10)); + printStatus(Quickstart.Color.CYAN, "***** Starting airlineStats data stream and publishing to Kafka *****"); + AirlineDataStream airlineDataStream = new AirlineDataStream(new File(quickstartTmpDir, "airlineStats")); + airlineDataStream.run(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + airlineDataStream.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + })); + break; + case "meetupRsvp": + kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(10)); + printStatus(Quickstart.Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); + MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(); + meetupRSVPProvider.run(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + meetupRSVPProvider.stopPublishing(); + } catch (Exception e) { + e.printStackTrace(); + } + })); + break; + case "meetupRsvpJson": + kafkaStarter.createTopic("meetupRSVPJsonEvents", KafkaStarterUtils.getTopicCreationProps(10)); + printStatus(Quickstart.Color.CYAN, "***** Starting meetupRsvpJson data stream and publishing to Kafka *****"); + MeetupRsvpStream meetupRSVPJsonProvider = new MeetupRsvpStream("meetupRSVPJsonEvents"); + meetupRSVPJsonProvider.run(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + meetupRSVPJsonProvider.stopPublishing(); + } catch (Exception e) { + e.printStackTrace(); + } + })); + break; + case "meetupRsvpComplexType": + kafkaStarter.createTopic("meetupRSVPComplexTypeEvents", KafkaStarterUtils.getTopicCreationProps(10)); + printStatus(Quickstart.Color.CYAN, + "***** Starting meetupRSVPComplexType data stream and publishing to Kafka *****"); + MeetupRsvpStream meetupRSVPComplexTypeProvider = new MeetupRsvpStream("meetupRSVPComplexTypeEvents"); + meetupRSVPComplexTypeProvider.run(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + meetupRSVPComplexTypeProvider.stopPublishing(); + } catch (Exception e) { + e.printStackTrace(); + } + })); + break; + case "githubEvents": + kafkaStarter.createTopic("githubEvents", KafkaStarterUtils.getTopicCreationProps(2)); + printStatus(Quickstart.Color.CYAN, "***** Starting githubEvents data stream and publishing to Kafka *****"); + publishStreamDataToKafka("githubEvents", new File(quickstartTmpDir, "githubEvents")); + break; + default: + throw new UnsupportedOperationException("Unknown stream name: " + streamName); + } + } + } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java index f635dc098b..f0b66ef1df 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java @@ -19,6 +19,7 @@ package org.apache.pinot.tools; import java.io.File; +import java.util.Objects; import org.apache.pinot.spi.config.table.TableType; @@ -103,4 +104,24 @@ public class QuickstartTableRequest { + ", tableRequestFile = " + _tableRequestFile + ", ingestionJobFile = " + _ingestionJobFile + ", bootstrapTableDir = " + _bootstrapTableDir + " }"; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + QuickstartTableRequest that = (QuickstartTableRequest) o; + return Objects.equals(_tableName, that._tableName) && _tableType == that._tableType + && Objects.equals(_schemaFile, that._schemaFile) && Objects.equals(_tableRequestFile, + that._tableRequestFile) && Objects.equals(_ingestionJobFile, that._ingestionJobFile) + && Objects.equals(_bootstrapTableDir, that._bootstrapTableDir); + } + + @Override + public int hashCode() { + return Objects.hash(_tableName, _tableType, _schemaFile, _tableRequestFile, _ingestionJobFile, _bootstrapTableDir); + } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java index 2d9e6f7d09..0027008c01 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java @@ -18,33 +18,20 @@ */ package org.apache.pinot.tools; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import java.io.File; -import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.utils.ZkStarter; -import org.apache.pinot.spi.stream.StreamDataProvider; -import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.PinotAdministrator; import org.apache.pinot.tools.admin.command.QuickstartRunner; -import org.apache.pinot.tools.streams.MeetupRsvpStream; -import org.apache.pinot.tools.utils.KafkaStarterUtils; -import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; - -public class RealtimeComplexTypeHandlingQuickStart extends QuickStartBase { +public class RealtimeComplexTypeHandlingQuickStart extends RealtimeQuickStart { @Override public List<String> types() { - return Arrays.asList("REALTIME_COMPLEX_TYPE", "REALTIME-COMPLEX-TYPE", - "STREAM_COMPLEX_TYPE", "STREAM-COMPLEX-TYPE"); + return Arrays.asList("REALTIME_COMPLEX_TYPE", "REALTIME-COMPLEX-TYPE", + "STREAM_COMPLEX_TYPE", "STREAM-COMPLEX-TYPE"); } - private StreamDataServerStartable _kafkaStarter; public static void main(String[] args) throws Exception { @@ -54,71 +41,15 @@ public class RealtimeComplexTypeHandlingQuickStart extends QuickStartBase { PinotAdministrator.main(arguments.toArray(new String[arguments.size()])); } - public void execute() + @Override + public void runSampleQueries(QuickstartRunner runner) throws Exception { - File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); - File baseDir = new File(quickstartTmpDir, "meetupRsvp"); - File dataDir = new File(baseDir, "data"); - Preconditions.checkState(dataDir.mkdirs()); - - File schemaFile = new File(baseDir, "meetupRsvp_schema.json"); - File tableConfigFile = new File(baseDir, "meetupRsvp_realtime_table_config.json"); - - ClassLoader classLoader = Quickstart.class.getClassLoader(); - URL resource = classLoader.getResource("examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, schemaFile); - resource = - classLoader.getResource("examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, tableConfigFile); - - QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); - QuickstartRunner runner = - new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, getConfigOverrides()); - - printStatus(Color.CYAN, "***** Starting Kafka *****"); - ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); - try { - _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, - KafkaStarterUtils.getDefaultKafkaConfiguration()); - } catch (Exception e) { - throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e); - } - _kafkaStarter.start(); - _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); - printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); - MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(); - meetupRSVPProvider.run(); - printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****"); - runner.startAll(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - printStatus(Color.GREEN, "***** Shutting down realtime quick start *****"); - runner.stop(); - meetupRSVPProvider.stopPublishing(); - _kafkaStarter.stop(); - ZkStarter.stopLocalZkServer(zookeeperInstance); - FileUtils.deleteDirectory(quickstartTmpDir); - } catch (Exception e) { - e.printStackTrace(); - } - })); - printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****"); - runner.bootstrapTable(); - printStatus(Color.CYAN, "***** Waiting for 20 seconds for a few events to get populated *****"); - Thread.sleep(20000); - - printStatus(Color.YELLOW, "***** Realtime json-index quickstart setup complete *****"); - String q1 = - "select \"group.group_topics.urlkey\", \"group.group_topics.topic_name\", \"group.group_id\" from meetupRsvp " - + "limit 10"; + "select \"group.group_topics.urlkey\", \"group.group_topics.topic_name\", \"group.group_id\" from " + + "meetupRsvpComplexType limit 10"; printStatus(Color.YELLOW, "Events related to fitness"); printStatus(Color.CYAN, "Query : " + q1); printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1))); printStatus(Color.GREEN, "***************************************************"); - - printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console"); } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java index e77d611fb0..75d643497e 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java @@ -18,33 +18,31 @@ */ package org.apache.pinot.tools; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import java.io.File; -import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.utils.ZkStarter; -import org.apache.pinot.spi.stream.StreamDataProvider; -import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.PinotAdministrator; import org.apache.pinot.tools.admin.command.QuickstartRunner; -import org.apache.pinot.tools.streams.MeetupRsvpStream; -import org.apache.pinot.tools.utils.KafkaStarterUtils; -import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; - -public class RealtimeJsonIndexQuickStart extends QuickStartBase { +public class RealtimeJsonIndexQuickStart extends RealtimeQuickStart { @Override public List<String> types() { return Arrays.asList("REALTIME_JSON_INDEX", "REALTIME-JSON-INDEX", "STREAM_JSON_INDEX", "STREAM-JSON-INDEX"); } - private StreamDataServerStartable _kafkaStarter; + @Override + public void runSampleQueries(QuickstartRunner runner) + throws Exception { + String q1 = "select json_extract_scalar(event_json, '$.event_name', 'STRING') from meetupRsvpJson where json_match" + + "(group_json, '\"$.group_topics[*].topic_name\"=''topic_name0''') limit 10"; + printStatus(Color.YELLOW, "Events related to fitness"); + printStatus(Color.CYAN, "Query : " + q1); + printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1))); + + printStatus(Color.GREEN, "***************************************************"); + } public static void main(String[] args) throws Exception { @@ -53,70 +51,4 @@ public class RealtimeJsonIndexQuickStart extends QuickStartBase { arguments.addAll(Arrays.asList(args)); PinotAdministrator.main(arguments.toArray(new String[arguments.size()])); } - - public void execute() - throws Exception { - File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); - File baseDir = new File(quickstartTmpDir, "meetupRsvp"); - File dataDir = new File(baseDir, "data"); - Preconditions.checkState(dataDir.mkdirs()); - - File schemaFile = new File(baseDir, "meetupRsvp_schema.json"); - File tableConfigFile = new File(baseDir, "meetupRsvp_realtime_table_config.json"); - - ClassLoader classLoader = Quickstart.class.getClassLoader(); - URL resource = classLoader.getResource("examples/stream/meetupRsvp/json_meetupRsvp_schema.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, schemaFile); - resource = classLoader.getResource("examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, tableConfigFile); - - QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); - QuickstartRunner runner = - new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, getConfigOverrides()); - - printStatus(Color.CYAN, "***** Starting Kafka *****"); - ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); - try { - _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, - KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance)); - } catch (Exception e) { - throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e); - } - _kafkaStarter.start(); - _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); - printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); - MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(); - meetupRSVPProvider.run(); - printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****"); - runner.startAll(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - printStatus(Color.GREEN, "***** Shutting down realtime quick start *****"); - runner.stop(); - meetupRSVPProvider.stopPublishing(); - _kafkaStarter.stop(); - ZkStarter.stopLocalZkServer(zookeeperInstance); - FileUtils.deleteDirectory(quickstartTmpDir); - } catch (Exception e) { - e.printStackTrace(); - } - })); - printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****"); - runner.bootstrapTable(); - printStatus(Color.CYAN, "***** Waiting for 20 seconds for a few events to get populated *****"); - Thread.sleep(20000); - - printStatus(Color.YELLOW, "***** Realtime json-index quickstart setup complete *****"); - - String q1 = "select json_extract_scalar(event_json, '$.event_name', 'STRING') from meetupRsvp where json_match" - + "(group_json, '\"$.group_topics[*].topic_name\"=''Fitness''') limit 10"; - printStatus(Color.YELLOW, "Events related to fitness"); - printStatus(Color.CYAN, "Query : " + q1); - printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1))); - - printStatus(Color.GREEN, "***************************************************"); - printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console"); - } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java index 626961c1ee..be8db25933 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java @@ -19,23 +19,14 @@ package org.apache.pinot.tools; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import java.io.File; -import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.utils.ZkStarter; -import org.apache.pinot.spi.stream.StreamDataProvider; -import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.PinotAdministrator; import org.apache.pinot.tools.admin.command.QuickstartRunner; -import org.apache.pinot.tools.streams.MeetupRsvpStream; -import org.apache.pinot.tools.utils.KafkaStarterUtils; - -import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; public class RealtimeQuickStart extends QuickStartBase { @@ -44,8 +35,6 @@ public class RealtimeQuickStart extends QuickStartBase { return Arrays.asList("REALTIME", "STREAM"); } - private StreamDataServerStartable _kafkaStarter; - public static void main(String[] args) throws Exception { List<String> arguments = new ArrayList<>(); @@ -54,62 +43,9 @@ public class RealtimeQuickStart extends QuickStartBase { PinotAdministrator.main(arguments.toArray(new String[arguments.size()])); } - public void execute() + @Override + public void runSampleQueries(QuickstartRunner runner) throws Exception { - File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); - File baseDir = new File(quickstartTmpDir, "meetupRsvp"); - File dataDir = new File(baseDir, "rawdata"); - Preconditions.checkState(dataDir.mkdirs()); - - File schemaFile = new File(baseDir, "meetupRsvp_schema.json"); - File tableConfigFile = new File(baseDir, "meetupRsvp_realtime_table_config.json"); - - ClassLoader classLoader = Quickstart.class.getClassLoader(); - URL resource = classLoader.getResource("examples/stream/meetupRsvp/meetupRsvp_schema.json"); - com.google.common.base.Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, schemaFile); - resource = classLoader.getResource("examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json"); - com.google.common.base.Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, tableConfigFile); - - QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); - final QuickstartRunner runner = - new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, getConfigOverrides()); - - printStatus(Color.CYAN, "***** Starting Kafka *****"); - final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); - try { - _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, - KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance)); - } catch (Exception e) { - throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e); - } - _kafkaStarter.start(); - _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(10)); - printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); - MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(); - meetupRSVPProvider.run(); - printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****"); - runner.startAll(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - printStatus(Color.GREEN, "***** Shutting down realtime quick start *****"); - runner.stop(); - meetupRSVPProvider.stopPublishing(); - _kafkaStarter.stop(); - ZkStarter.stopLocalZkServer(zookeeperInstance); - FileUtils.deleteDirectory(quickstartTmpDir); - } catch (Exception e) { - e.printStackTrace(); - } - })); - printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****"); - runner.bootstrapTable(); - printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****"); - Thread.sleep(5000); - - printStatus(Color.YELLOW, "***** Realtime quickstart setup complete *****"); - String q1 = "select count(*) from meetupRsvp limit 1"; printStatus(Color.YELLOW, "Total number of documents in the table"); printStatus(Color.CYAN, "Query : " + q1); @@ -141,6 +77,40 @@ public class RealtimeQuickStart extends QuickStartBase { printStatus(Color.CYAN, "Query : " + q5); printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q5))); printStatus(Color.GREEN, "***************************************************"); + } + + public void execute() + throws Exception { + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); + File quickstartRunnerDir = new File(quickstartTmpDir, "quickstart"); + Preconditions.checkState(quickstartRunnerDir.mkdirs()); + List<QuickstartTableRequest> quickstartTableRequests = bootstrapStreamTableDirectories(quickstartTmpDir); + final QuickstartRunner runner = + new QuickstartRunner(quickstartTableRequests, 1, 1, 1, 1, quickstartRunnerDir, getConfigOverrides()); + + startKafka(); + startAllDataStreams(_kafkaStarter, quickstartTmpDir); + + printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker, server and minion *****"); + runner.startAll(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + printStatus(Color.GREEN, "***** Shutting down realtime quick start *****"); + runner.stop(); + FileUtils.deleteDirectory(quickstartTmpDir); + } catch (Exception e) { + e.printStackTrace(); + } + })); + + printStatus(Color.CYAN, "***** Bootstrap all tables *****"); + runner.bootstrapTable(); + + printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****"); + Thread.sleep(5000); + + printStatus(Color.YELLOW, "***** Realtime quickstart setup complete *****"); + runSampleQueries(runner); printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console"); } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java index 60786d82a3..b51baf6179 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java @@ -18,122 +18,29 @@ */ package org.apache.pinot.tools; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import java.io.File; -import java.net.URL; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.LineIterator; -import org.apache.pinot.common.utils.ZkStarter; -import org.apache.pinot.spi.stream.StreamDataProducer; -import org.apache.pinot.spi.stream.StreamDataProvider; -import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.PinotAdministrator; import org.apache.pinot.tools.admin.command.QuickstartRunner; -import org.apache.pinot.tools.utils.KafkaStarterUtils; - -import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; /** * This quickstart shows how RealtimeToOfflineSegmentsTask and MergeRollupTask minion * tasks continuously optimize segments as data gets ingested into Realtime table. */ -public class RealtimeQuickStartWithMinion extends QuickStartBase { +public class RealtimeQuickStartWithMinion extends RealtimeQuickStart { @Override public List<String> types() { return Arrays.asList("REALTIME_MINION", "REALTIME-MINION"); } - private StreamDataServerStartable _kafkaStarter; - - public static void main(String[] args) - throws Exception { - List<String> arguments = new ArrayList<>(); - arguments.addAll(Arrays.asList("QuickStart", "-type", "REALTIME-MINION")); - arguments.addAll(Arrays.asList(args)); - PinotAdministrator.main(arguments.toArray(new String[arguments.size()])); - } - - public Map<String, Object> getConfigOverrides() { - Map<String, Object> properties = new HashMap<>(super.getConfigOverrides()); - properties.putIfAbsent("controller.task.scheduler.enabled", true); - return properties; - } - - public void execute() + @Override + public void runSampleQueries(QuickstartRunner runner) throws Exception { - File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); - File baseDir = new File(quickstartTmpDir, "githubEvents"); - File dataDir = new File(baseDir, "rawdata"); - Preconditions.checkState(dataDir.mkdirs()); - - File schemaFile = new File(baseDir, "githubEvents_schema.json"); - File realtimeTableConfigFile = new File(baseDir, "githubEvents_realtime_table_config.json"); - File offlineTableConfigFile = new File(baseDir, "githubEvents_offline_table_config.json"); - File inputDataFile = new File(baseDir, "2021-07-21-few-hours.json"); - - ClassLoader classLoader = Quickstart.class.getClassLoader(); - URL resource = classLoader.getResource("examples/minions/stream/githubEvents/githubEvents_schema.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, schemaFile); - resource = classLoader.getResource("examples/minions/stream/githubEvents/githubEvents_realtime_table_config.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, realtimeTableConfigFile); - resource = classLoader.getResource("examples/minions/stream/githubEvents/githubEvents_offline_table_config.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, offlineTableConfigFile); - resource = Quickstart.class.getClassLoader() - .getResource("examples/minions/stream/githubEvents/rawdata/2021-07-21-few-hours.json"); - Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, inputDataFile); - - QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); - QuickstartRunner runner = - new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, - dataDir, true, null, getConfigOverrides(), null, true); - - printStatus(Color.CYAN, "***** Starting Kafka *****"); - final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); - try { - _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, - KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance)); - } catch (Exception e) { - throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e); - } - _kafkaStarter.start(); - - printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****"); - runner.startAll(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - printStatus(Color.GREEN, "***** Shutting down realtime-minion quick start *****"); - runner.stop(); - _kafkaStarter.stop(); - ZkStarter.stopLocalZkServer(zookeeperInstance); - FileUtils.deleteDirectory(quickstartTmpDir); - } catch (Exception e) { - e.printStackTrace(); - } - })); - - printStatus(Color.CYAN, "***** Sending events to Kafka *****"); - _kafkaStarter.createTopic("githubEvents", KafkaStarterUtils.getTopicCreationProps(2)); - publishGithubEventsToKafka("githubEvents", inputDataFile); - - printStatus(Color.CYAN, "***** Bootstrap githubEvents tables *****"); - runner.bootstrapTable(); - printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****"); - Thread.sleep(5000); - printStatus(Color.YELLOW, "***** Realtime-minion quickstart setup complete *****"); String q1 = "select count(*) from githubEvents limit 1"; @@ -142,26 +49,21 @@ public class RealtimeQuickStartWithMinion extends QuickStartBase { printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1))); printStatus(Color.GREEN, "***************************************************"); - printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console"); printStatus(Color.GREEN, "In particular, you will find that OFFLINE table gets segments from REALTIME table;"); printStatus(Color.GREEN, "and segments in OFFLINE table get merged into larger ones within a few minutes."); } - private static void publishGithubEventsToKafka(String topicName, File dataFile) + public static void main(String[] args) throws Exception { - Properties properties = new Properties(); - properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER); - properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); - properties.put("request.required.acks", "1"); - StreamDataProducer producer = - StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); - try { - LineIterator dataStream = FileUtils.lineIterator(dataFile); - while (dataStream.hasNext()) { - producer.produce(topicName, dataStream.nextLine().getBytes(StandardCharsets.UTF_8)); - } - } finally { - producer.close(); - } + List<String> arguments = new ArrayList<>(); + arguments.addAll(Arrays.asList("QuickStart", "-type", "REALTIME-MINION")); + arguments.addAll(Arrays.asList(args)); + PinotAdministrator.main(arguments.toArray(new String[arguments.size()])); + } + + public Map<String, Object> getConfigOverrides() { + Map<String, Object> properties = new HashMap<>(super.getConfigOverrides()); + properties.putIfAbsent("controller.task.scheduler.enabled", true); + return properties; } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java index 383d2593ec..22af89ff27 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java @@ -26,9 +26,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.utils.ZkStarter; -import org.apache.pinot.spi.stream.StreamDataProvider; -import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.PinotAdministrator; import org.apache.pinot.tools.admin.command.QuickstartRunner; @@ -36,8 +33,6 @@ import org.apache.pinot.tools.streams.MeetupRsvpStream; import org.apache.pinot.tools.streams.RsvpSourceGenerator; import org.apache.pinot.tools.utils.KafkaStarterUtils; -import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; - public class UpsertJsonQuickStart extends QuickStartBase { @Override @@ -45,8 +40,6 @@ public class UpsertJsonQuickStart extends QuickStartBase { return Arrays.asList("UPSERT_JSON_INDEX", "UPSERT-JSON-INDEX"); } - private StreamDataServerStartable _kafkaStarter; - public static void main(String[] args) throws Exception { List<String> arguments = new ArrayList<>(); @@ -77,15 +70,7 @@ public class UpsertJsonQuickStart extends QuickStartBase { QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, getConfigOverrides()); - printStatus(Color.CYAN, "***** Starting Kafka *****"); - ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); - try { - _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, - KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance)); - } catch (Exception e) { - throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e); - } - _kafkaStarter.start(); + startKafka(); _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(RsvpSourceGenerator.KeyColumn.RSVP_ID); @@ -97,8 +82,6 @@ public class UpsertJsonQuickStart extends QuickStartBase { printStatus(Color.GREEN, "***** Shutting down realtime quick start *****"); runner.stop(); meetupRSVPProvider.stopPublishing(); - _kafkaStarter.stop(); - ZkStarter.stopLocalZkServer(zookeeperInstance); FileUtils.deleteDirectory(quickstartTmpDir); } catch (Exception e) { e.printStackTrace(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java index ccf219325d..d67bbcdaf6 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java @@ -27,17 +27,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.utils.ZkStarter; -import org.apache.pinot.spi.stream.StreamDataProvider; -import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.PinotAdministrator; import org.apache.pinot.tools.admin.command.QuickstartRunner; import org.apache.pinot.tools.streams.MeetupRsvpStream; import org.apache.pinot.tools.utils.KafkaStarterUtils; -import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; - public class UpsertQuickStart extends QuickStartBase { @Override @@ -45,8 +40,6 @@ public class UpsertQuickStart extends QuickStartBase { return Collections.singletonList("UPSERT"); } - private StreamDataServerStartable _kafkaStarter; - public static void main(String[] args) throws Exception { List<String> arguments = new ArrayList<>(); @@ -77,15 +70,7 @@ public class UpsertQuickStart extends QuickStartBase { final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, getConfigOverrides()); - printStatus(Color.CYAN, "***** Starting Kafka *****"); - final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); - try { - _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, - KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance)); - } catch (Exception e) { - throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e); - } - _kafkaStarter.start(); + startKafka(); _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true); @@ -97,8 +82,6 @@ public class UpsertQuickStart extends QuickStartBase { printStatus(Color.GREEN, "***** Shutting down realtime quick start *****"); runner.stop(); meetupRSVPProvider.stopPublishing(); - _kafkaStarter.stop(); - ZkStarter.stopLocalZkServer(zookeeperInstance); FileUtils.deleteDirectory(quickstartTmpDir); } catch (Exception e) { e.printStackTrace(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java index 2dee145ac0..9b4aea8060 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java @@ -25,6 +25,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.StreamDataProducer; import org.apache.pinot.spi.stream.StreamDataProvider; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.tools.QuickStartBase; import org.apache.pinot.tools.Quickstart; import org.apache.pinot.tools.utils.KafkaStarterUtils; @@ -42,11 +43,6 @@ public class AirlineDataStream { private StreamDataProducer _producer; private PinotRealtimeSource _pinotStream; - public AirlineDataStream(Schema pinotSchema, TableConfig tableConfig, File avroFile) - throws Exception { - this(pinotSchema, tableConfig, avroFile, getDefaultKafkaProducer()); - } - public AirlineDataStream(Schema pinotSchema, TableConfig tableConfig, File avroFile, StreamDataProducer producer) throws IOException { _pinotSchema = pinotSchema; @@ -63,6 +59,13 @@ public class AirlineDataStream { + "every 60 events (which is approximately 60 seconds) *****"); } + public AirlineDataStream(File baseDir) + throws Exception { + this(Schema.fromFile(new File(baseDir, "airlineStats_schema.json")), + JsonUtils.fileToObject(new File(baseDir, "airlineStats_realtime_table_config.json"), TableConfig.class), + new File(baseDir, "rawdata/airlineStats_data.avro"), getDefaultKafkaProducer()); + } + public static StreamDataProducer getDefaultKafkaProducer() throws Exception { Properties properties = new Properties(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java index f42ffd7fe9..527ca283fb 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java @@ -30,23 +30,35 @@ import org.slf4j.LoggerFactory; public class MeetupRsvpStream { protected static final Logger LOGGER = LoggerFactory.getLogger(MeetupRsvpStream.class); private static final String DEFAULT_TOPIC_NAME = "meetupRSVPEvents"; - protected String _topicName = DEFAULT_TOPIC_NAME; + protected String _topicName; protected PinotRealtimeSource _pinotRealtimeSource; public MeetupRsvpStream() throws Exception { - this(false); + this(DEFAULT_TOPIC_NAME, RsvpSourceGenerator.KeyColumn.NONE); } public MeetupRsvpStream(boolean partitionByKey) throws Exception { // calling this constructor means that we wish to use EVENT_ID as key. RsvpId is used by MeetupRsvpJsonStream - this(partitionByKey ? RsvpSourceGenerator.KeyColumn.EVENT_ID : RsvpSourceGenerator.KeyColumn.NONE); + this(DEFAULT_TOPIC_NAME, + partitionByKey ? RsvpSourceGenerator.KeyColumn.EVENT_ID : RsvpSourceGenerator.KeyColumn.NONE); + } + + public MeetupRsvpStream(String topicName) + throws Exception { + this(topicName, RsvpSourceGenerator.KeyColumn.NONE); } public MeetupRsvpStream(RsvpSourceGenerator.KeyColumn keyColumn) throws Exception { + this(DEFAULT_TOPIC_NAME, keyColumn); + } + + public MeetupRsvpStream(String topicName, RsvpSourceGenerator.KeyColumn keyColumn) + throws Exception { + _topicName = topicName; Properties properties = new Properties(); properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER); properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java index d192d29d1b..cf04a9acbe 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java @@ -19,6 +19,7 @@ package org.apache.pinot.tools.streams; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import java.time.LocalDateTime; @@ -50,17 +51,62 @@ public class RsvpSourceGenerator implements PinotSourceDataGenerator { public RSVP createMessage() { String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + ""; ObjectNode json = JsonUtils.newObjectNode(); - json.put("venue_name", "venue_name" + ThreadLocalRandom.current().nextInt()); - json.put("event_name", "event_name" + ThreadLocalRandom.current().nextInt()); + ObjectNode eventJson = JsonUtils.newObjectNode(); + json.set("event", eventJson); + ObjectNode groupJson = JsonUtils.newObjectNode(); + json.set("group", groupJson); + ObjectNode venueJson = JsonUtils.newObjectNode(); + json.set("venue", venueJson); + + String venueName = "venue_name" + ThreadLocalRandom.current().nextInt(); + venueJson.put("venue_name", venueName); + json.put("venue_name", venueName); + + String eventName = "event_name" + ThreadLocalRandom.current().nextInt(); + eventJson.put("event_name", eventName); + json.put("event_name", eventName); + json.put("event_id", eventId); json.put("event_time", DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10))); - json.put("group_city", "group_city" + ThreadLocalRandom.current().nextInt(1000)); - json.put("group_country", "group_country" + ThreadLocalRandom.current().nextInt(100)); - json.put("group_id", Math.abs(ThreadLocalRandom.current().nextLong())); - json.put("group_name", "group_name" + ThreadLocalRandom.current().nextInt()); - json.put("group_lat", ThreadLocalRandom.current().nextDouble(-90.0, 90.0)); - json.put("group_lon", ThreadLocalRandom.current().nextDouble(180.0)); + + ArrayNode groupTopicsJson = JsonUtils.newArrayNode(); + groupJson.set("group_topics", groupTopicsJson); + for (int i = 0; i < ThreadLocalRandom.current().nextInt(5) + 1; i++) { + ObjectNode groupTopicJson = JsonUtils.newObjectNode(); + groupTopicJson.put("topic_name", "topic_name" + ThreadLocalRandom.current().nextInt(10)); + groupTopicJson.put("urlkey", "http://group-url-" + ThreadLocalRandom.current().nextInt(1000)); + groupTopicsJson.add(groupTopicJson); + } + + String groupCity = "group_city" + ThreadLocalRandom.current().nextInt(1000); + groupJson.put("group_city", groupCity); + json.put("group_city", groupCity); + + String groupCountry = "group_country" + ThreadLocalRandom.current().nextInt(100); + groupJson.put("group_country", groupCountry); + json.put("group_country", groupCountry); + + long groupId = Math.abs(ThreadLocalRandom.current().nextLong()); + groupJson.put("group_id", groupId); + json.put("group_id", groupId); + + String groupName = "group_name" + ThreadLocalRandom.current().nextInt(); + groupJson.put("group_name", groupName); + json.put("group_name", groupName); + + double groupLat = ThreadLocalRandom.current().nextDouble(-90.0, 90.0); + groupJson.put("group_lat", groupLat); + json.put("group_lat", groupLat); + + double groupLon = ThreadLocalRandom.current().nextDouble(-90.0, 90.0); + groupJson.put("group_lon", groupLon); + json.put("group_lon", groupLon); + json.put("mtime", DateTime.now().getMillis()); + + json.put("rsvp_id", ThreadLocalRandom.current().nextLong()); + json.put("guests", ThreadLocalRandom.current().nextInt(100)); + json.put("rsvp_count", 1); return new RSVP(eventId, eventId, json); } @@ -91,6 +137,7 @@ public class RsvpSourceGenerator implements PinotSourceDataGenerator { public void close() throws Exception { } + public enum KeyColumn { NONE, EVENT_ID, diff --git a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json index cb5d690628..94da8849f2 100644 --- a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json +++ b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json @@ -29,7 +29,33 @@ "stream.kafka.consumer.prop.auto.offset.reset": "smallest" } }, + "fieldConfigList": [ + { + "name": "ts", + "encodingType": "DICTIONARY", + "indexTypes": ["TIMESTAMP"], + "timestampConfig": { + "granularities": [ + "DAY", + "WEEK", + "MONTH" + ] + } + } + ], "metadata": { "customConfigs": {} + }, + "ingestionConfig": { + "transformConfigs": [ + { + "columnName": "ts", + "transformFunction": "fromEpochDays(DaysSinceEpoch)" + }, + { + "columnName": "tsRaw", + "transformFunction": "fromEpochDays(DaysSinceEpoch)" + } + ] } } diff --git a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json index 97757af519..563e5caa19 100644 --- a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json +++ b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json @@ -330,6 +330,18 @@ "dataType": "INT", "format": "1:DAYS:EPOCH", "granularity": "1:DAYS" + }, + { + "name": "ts", + "dataType": "TIMESTAMP", + "format": "1:MILLISECONDS:TIMESTAMP", + "granularity": "1:SECONDS" + }, + { + "name": "tsRaw", + "dataType": "TIMESTAMP", + "format": "1:MILLISECONDS:TIMESTAMP", + "granularity": "1:SECONDS" } ], "schemaName": "airlineStats" diff --git a/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.avro b/pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.avro similarity index 100% rename from pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.avro rename to pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.avro diff --git a/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.json b/pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.json similarity index 100% rename from pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.json rename to pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.json diff --git a/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.orc b/pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.orc similarity index 100% rename from pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.orc rename to pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.orc diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json index 6742b2177b..e2420e12ab 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json +++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json @@ -8,9 +8,25 @@ "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", "schemaName": "meetupRsvp", "replication": "1", - "replicasPerPartition": "1" + "replicasPerPartition": "1", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "1" }, "tenants": {}, + "fieldConfigList": [ + { + "name": "mtime", + "encodingType": "DICTIONARY", + "indexTypes": ["TIMESTAMP"], + "timestampConfig": { + "granularities": [ + "DAY", + "WEEK", + "MONTH" + ] + } + } + ], "tableIndexConfig": { "loadMode": "MMAP", "streamConfigs": { diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json index 1a144425a2..41379c2569 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json +++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json @@ -8,9 +8,25 @@ "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", "schemaName": "meetupRsvp", "replication": "1", - "replicasPerPartition": "1" + "replicasPerPartition": "1", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "1" }, "tenants": {}, + "fieldConfigList": [ + { + "name": "mtime", + "encodingType": "DICTIONARY", + "indexTypes": ["TIMESTAMP"], + "timestampConfig": { + "granularities": [ + "DAY", + "WEEK", + "MONTH" + ] + } + } + ], "tableIndexConfig": { "loadMode": "MMAP", "streamConfigs": { diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_realtime_table_config.json similarity index 57% copy from pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json copy to pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_realtime_table_config.json index 7b20d696e1..43662de8fd 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json +++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_realtime_table_config.json @@ -1,5 +1,5 @@ { - "tableName": "meetupRsvp", + "tableName": "meetupRsvpComplexType", "tableType": "REALTIME", "tenants": {}, "segmentsConfig": { @@ -16,7 +16,7 @@ { "streamType": "kafka", "stream.kafka.consumer.type": "lowLevel", - "stream.kafka.topic.name": "meetupRSVPEvents", + "stream.kafka.topic.name": "meetupRSVPComplexTypeEvents", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", @@ -26,38 +26,15 @@ ] }, "transformConfigs": [ - { - "columnName": "event_json", - "transformFunction": "jsonFormat(event)" - }, - { - "columnName": "group_json", - "transformFunction": "jsonFormat(\"group\")" - }, - { - "columnName": "member_json", - "transformFunction": "jsonFormat(member)" - }, - { - "columnName": "venue_json", - "transformFunction": "jsonFormat(venue)" - } - ] + ], + "complexTypeConfig": { + "fieldsToUnnest": [ + "group.group_topics" + ] + } }, "tableIndexConfig": { - "loadMode": "MMAP", - "noDictionaryColumns": [ - "event_json", - "group_json", - "member_json", - "venue_json" - ], - "jsonIndexColumns": [ - "event_json", - "group_json", - "member_json", - "venue_json" - ] + "loadMode": "MMAP" }, "metadata": { "customConfigs": {} diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_schema.json new file mode 100644 index 0000000000..0463c01cc1 --- /dev/null +++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_schema.json @@ -0,0 +1,63 @@ +{ + "schemaName": "meetupRsvpComplexType", + "dimensionFieldSpecs": [ + { + "name": "group.group_topics.urlkey", + "dataType": "STRING" + }, + { + "name": "group.group_topics.topic_name", + "dataType": "STRING" + }, + { + "name": "group.group_city", + "dataType": "STRING" + }, + { + "name": "group.group_country", + "dataType": "STRING" + }, + { + "name": "group.group_id", + "dataType": "INT" + }, + { + "name": "group.group_name", + "dataType": "STRING" + }, + { + "name": "group.group_lon", + "dataType": "DOUBLE" + }, + { + "name": "group.group_urlname", + "dataType": "STRING" + }, + { + "name": "group.group_state", + "dataType": "STRING" + }, + { + "name": "group.group_lat", + "dataType": "DOUBLE" + }, + { + "name": "rsvp_id", + "dataType": "LONG" + } + ], + "metricFieldSpecs": [ + { + "name": "guests", + "dataType": "INT" + } + ], + "dateTimeFieldSpecs": [ + { + "name": "mtime", + "dataType": "LONG", + "format": "1:MILLISECONDS:EPOCH", + "granularity": "1:MILLISECONDS" + } + ] +} diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_realtime_table_config.json similarity index 94% rename from pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json rename to pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_realtime_table_config.json index 7b20d696e1..6ab8dd4f36 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json +++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_realtime_table_config.json @@ -1,5 +1,5 @@ { - "tableName": "meetupRsvp", + "tableName": "meetupRsvpJson", "tableType": "REALTIME", "tenants": {}, "segmentsConfig": { @@ -16,7 +16,7 @@ { "streamType": "kafka", "stream.kafka.consumer.type": "lowLevel", - "stream.kafka.topic.name": "meetupRSVPEvents", + "stream.kafka.topic.name": "meetupRSVPJsonEvents", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_schema.json similarity index 95% rename from pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json rename to pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_schema.json index afeee95f6b..23753ffb02 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json +++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_schema.json @@ -1,5 +1,5 @@ { - "schemaName": "meetupRsvp", + "schemaName": "meetupRsvpJson", "dimensionFieldSpecs": [ { "name": "event_json", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org