This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new d19d604 Adding bootstrap table command and move quickstart to use it (#6220) d19d604 is described below commit d19d604a4d007e06def26b41f50706ad485ea314 Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Mon Nov 2 17:07:55 2020 -0800 Adding bootstrap table command and move quickstart to use it (#6220) --- .../org/apache/pinot/tools/BootstrapTableTool.java | 132 +++++++++++++++++++++ .../apache/pinot/tools/GitHubEventsQuickstart.java | 6 +- .../org/apache/pinot/tools/HybridQuickstart.java | 44 +++---- .../java/org/apache/pinot/tools/Quickstart.java | 24 ++-- .../apache/pinot/tools/QuickstartTableRequest.java | 79 ++++++------ .../org/apache/pinot/tools/RealtimeQuickStart.java | 16 +-- .../org/apache/pinot/tools/UpsertQuickStart.java | 15 ++- .../pinot/tools/admin/PinotAdministrator.java | 2 + .../tools/admin/command/BootstrapTableCommand.java | 113 ++++++++++++++++++ .../tools/admin/command/QuickstartRunner.java | 19 ++- 10 files changed, 348 insertions(+), 102 deletions(-) diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java new file mode 100644 index 0000000..e795390 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.Reader; +import java.net.URL; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.apache.pinot.tools.admin.command.AddTableCommand; +import org.apache.pinot.tools.utils.JarUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + + +public class BootstrapTableTool { + private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapTableTool.class); + private final String _controllerHost; + private final int _controllerPort; + private final String _tableDir; + + public BootstrapTableTool(String controllerHost, int controllerPort, String tableDir) { + _controllerHost = controllerHost; + _controllerPort = controllerPort; + _tableDir = tableDir; + } + + public boolean execute() + throws Exception { + File setupTableTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis())); + + File tableDir = new File(_tableDir); + String tableName = tableDir.getName(); + File schemaFile = new File(tableDir, String.format("%s_schema.json", tableName)); + if (!schemaFile.exists()) { + throw new RuntimeException( + "Unable to find schema file for table - " + tableName + ", at " + schemaFile.getAbsolutePath()); + } + boolean tableCreationResult = false; + File offlineTableConfigFile = new File(tableDir, String.format("%s_offline_table_config.json", tableName)); + if (offlineTableConfigFile.exists()) { + File ingestionJobSpecFile = new File(tableDir, "ingestionJobSpec.yaml"); + tableCreationResult = + bootstrapOfflineTable(setupTableTmpDir, tableName, schemaFile, offlineTableConfigFile, ingestionJobSpecFile); + } + File realtimeTableConfigFile = new File(tableDir, String.format("%s_realtime_table_config.json", tableName)); + if (realtimeTableConfigFile.exists()) { + tableCreationResult = bootstrapRealtimeTable(tableName, schemaFile, realtimeTableConfigFile); + } + if (!tableCreationResult) { + throw new RuntimeException(String + .format("Unable to find config files for table - %s, at location [%s] or [%s].", tableName, + offlineTableConfigFile.getAbsolutePath(), realtimeTableConfigFile.getAbsolutePath())); + } + return true; + } + + private boolean bootstrapRealtimeTable(String tableName, File schemaFile, File realtimeTableConfigFile) + throws Exception { + LOGGER.info("Adding realtime table {}", tableName); + if (!createTable(schemaFile, realtimeTableConfigFile)) { + throw new RuntimeException(String + .format("Unable to create realtime table - %s from schema file [%s] and table conf file [%s].", tableName, + schemaFile, realtimeTableConfigFile)); + } + return true; + } + + private boolean createTable(File schemaFile, File tableConfigFile) + throws Exception { + return new AddTableCommand().setSchemaFile(schemaFile.getAbsolutePath()) + .setTableConfigFile(tableConfigFile.getAbsolutePath()).setControllerHost(_controllerHost) + .setControllerPort(String.valueOf(_controllerPort)).setExecute(true).execute(); + } + private boolean bootstrapOfflineTable(File setupTableTmpDir, String tableName, File schemaFile, + File offlineTableConfigFile, File ingestionJobSpecFile) + throws Exception { + LOGGER.info("Adding offline table: {}", tableName); + boolean tableCreationResult = createTable(schemaFile, offlineTableConfigFile); + + if (!tableCreationResult) { + throw new RuntimeException(String + .format("Unable to create offline table - %s from schema file [%s] and table conf file [%s].", tableName, + schemaFile, offlineTableConfigFile)); + } + + if (ingestionJobSpecFile.exists()) { + LOGGER.info("Launch data ingestion job to build index segment for table {} and push to controller [{}:{}]", + tableName, _controllerHost, _controllerPort); + try (Reader reader = new BufferedReader(new FileReader(ingestionJobSpecFile.getAbsolutePath()))) { + SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class); + String inputDirURI = spec.getInputDirURI(); + if (!new File(inputDirURI).exists()) { + URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI); + if (resolvedInputDirURI.getProtocol().equals("jar")) { + String[] splits = resolvedInputDirURI.getFile().split("!"); + String inputDir = new File(setupTableTmpDir, "inputData").toString(); + JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir); + spec.setInputDirURI(inputDir); + } else { + spec.setInputDirURI(resolvedInputDirURI.toString()); + } + } + IngestionJobLauncher.runIngestionJob(spec); + } + } else { + LOGGER.info("Not found ingestionJobSpec.yaml at location [{}], skipping data ingestion", + ingestionJobSpecFile.getAbsolutePath()); + } + return true; + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java index 0096a4b..09d7522 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java @@ -62,7 +62,7 @@ public class GitHubEventsQuickstart { public void execute(String personalAccessToken) throws Exception { - final File quickStartDataDir = new File("githubEvents" + System.currentTimeMillis()); + final File quickStartDataDir = new File(new File("githubEvents-" + System.currentTimeMillis()), "pullRequestMergedEvents"); if (!quickStartDataDir.exists()) { Preconditions.checkState(quickStartDataDir.mkdirs()); @@ -82,7 +82,7 @@ public class GitHubEventsQuickstart { File tempDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis())); Preconditions.checkState(tempDir.mkdirs()); - QuickstartTableRequest request = new QuickstartTableRequest("pullRequestMergedEvents", schemaFile, tableConfigFile); + QuickstartTableRequest request = new QuickstartTableRequest(quickStartDataDir.getAbsolutePath()); final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, tempDir); printStatus(Color.CYAN, "***** Starting Kafka *****"); @@ -92,7 +92,7 @@ public class GitHubEventsQuickstart { runner.startAll(); printStatus(Color.CYAN, "***** Adding pullRequestMergedEvents table *****"); - runner.addTable(); + runner.bootstrapTable(); printStatus(Color.CYAN, "***** Starting pullRequestMergedEvents data stream and publishing to Kafka *****"); final PullRequestMergedEventsStream pullRequestMergedEventsStream = diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java index d682218..4d785ee 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java @@ -27,7 +27,6 @@ import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.ZkStarter; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.stream.StreamDataProvider; import org.apache.pinot.spi.stream.StreamDataServerStartable; @@ -57,12 +56,11 @@ public class HybridQuickstart { new HybridQuickstart().execute(); } - private QuickstartTableRequest prepareOfflineTableRequest(File configDir) + private QuickstartTableRequest prepareTableRequest(File baseDir) throws IOException { - - _schemaFile = new File(configDir, "airlineStats_schema.json"); - _ingestionJobSpecFile = new File(configDir, "ingestionJobSpec.yaml"); - File tableConfigFile = new File(configDir, "airlineStats_offline_table_config.json"); + _schemaFile = new File(baseDir, "airlineStats_schema.json"); + _ingestionJobSpecFile = new File(baseDir, "ingestionJobSpec.yaml"); + File tableConfigFile = new File(baseDir, "airlineStats_offline_table_config.json"); ClassLoader classLoader = Quickstart.class.getClassLoader(); URL resource = classLoader.getResource("examples/batch/airlineStats/airlineStats_schema.json"); @@ -75,26 +73,18 @@ public class HybridQuickstart { Preconditions.checkNotNull(resource); FileUtils.copyURLToFile(resource, tableConfigFile); - return new QuickstartTableRequest("airlineStats", _schemaFile, tableConfigFile, _ingestionJobSpecFile, - FileFormat.AVRO); - } - - private QuickstartTableRequest prepareRealtimeTableRequest(File configDir) - throws IOException { - - _dataFile = new File(configDir, "airlineStats_data.avro"); - _realtimeTableConfigFile = new File(configDir, "airlineStats_realtime_table_config.json"); - - URL resource = Quickstart.class.getClassLoader() + _realtimeTableConfigFile = new File(baseDir, "airlineStats_realtime_table_config.json"); + resource = Quickstart.class.getClassLoader() .getResource("examples/stream/airlineStats/airlineStats_realtime_table_config.json"); Preconditions.checkNotNull(resource); FileUtils.copyURLToFile(resource, _realtimeTableConfigFile); resource = Quickstart.class.getClassLoader() .getResource("examples/stream/airlineStats/sample_data/airlineStats_data.avro"); Preconditions.checkNotNull(resource); + _dataFile = new File(baseDir, "airlineStats_data.avro"); FileUtils.copyURLToFile(resource, _dataFile); - return new QuickstartTableRequest("airlineStats", _schemaFile, _realtimeTableConfigFile); + return new QuickstartTableRequest(baseDir.getAbsolutePath()); } private void startKafka() { @@ -111,16 +101,13 @@ public class HybridQuickstart { public void execute() throws Exception { - File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis())); - File configDir = new File(quickstartTmpDir, "configs"); - File dataDir = new File(quickstartTmpDir, "data"); - Preconditions.checkState(configDir.mkdirs()); + File baseDir = new File(quickstartTmpDir, "airlineStats"); + File dataDir = new File(baseDir, "data"); Preconditions.checkState(dataDir.mkdirs()); - QuickstartTableRequest offlineRequest = prepareOfflineTableRequest(configDir); - QuickstartTableRequest realtimeTableRequest = prepareRealtimeTableRequest(configDir); + QuickstartTableRequest bootstrapTableRequest = prepareTableRequest(baseDir); final QuickstartRunner runner = - new QuickstartRunner(Lists.newArrayList(offlineRequest, realtimeTableRequest), 1, 1, 1, dataDir); + new QuickstartRunner(Lists.newArrayList(bootstrapTableRequest), 1, 1, 1, dataDir); printStatus(Color.YELLOW, "***** Starting Kafka *****"); startKafka(); printStatus(Color.YELLOW, "***** Starting airline data stream and publishing to Kafka *****"); @@ -142,11 +129,8 @@ public class HybridQuickstart { e.printStackTrace(); } })); - printStatus(Color.YELLOW, "***** Adding airlineStats offline and realtime table *****"); - runner.addTable(); - printStatus(Color.YELLOW, - "***** Launch data ingestion job to build index segments for airlineStats and push to controller *****"); - runner.launchDataIngestionJob(); + printStatus(Color.YELLOW, "***** Bootstrap airlineStats offline and realtime table *****"); + runner.bootstrapTable(); printStatus(Color.YELLOW, "***** Pinot Hybrid with hybrid table setup is complete *****"); printStatus(Color.YELLOW, "***** Sequence of operations *****"); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java index b28481a..69f5941 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java @@ -24,7 +24,6 @@ import com.google.common.collect.Lists; import java.io.File; import java.net.URL; import org.apache.commons.io.FileUtils; -import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.tools.admin.command.QuickstartRunner; @@ -135,15 +134,14 @@ public class Quickstart { public void execute() throws Exception { File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis())); - File configDir = new File(quickstartTmpDir, "configs"); - File dataDir = new File(quickstartTmpDir, "data"); - Preconditions.checkState(configDir.mkdirs()); + File baseDir = new File(quickstartTmpDir, "baseballStats"); + File dataDir = new File(baseDir, "rawdata"); Preconditions.checkState(dataDir.mkdirs()); - File schemaFile = new File(configDir, "baseballStats_schema.json"); - File dataFile = new File(configDir, "baseballStats_data.csv"); - File tableConfigFile = new File(configDir, "baseballStats_offline_table_config.json"); - File ingestionJobSpecFile = new File(configDir, "ingestionJobSpec.yaml"); + File schemaFile = new File(baseDir, "baseballStats_schema.json"); + File tableConfigFile = new File(baseDir, "baseballStats_offline_table_config.json"); + File ingestionJobSpecFile = new File(baseDir, "ingestionJobSpec.yaml"); + File dataFile = new File(dataDir, "baseballStats_data.csv"); ClassLoader classLoader = Quickstart.class.getClassLoader(); URL resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_schema.json"); @@ -159,8 +157,7 @@ public class Quickstart { com.google.common.base.Preconditions.checkNotNull(resource); FileUtils.copyURLToFile(resource, tableConfigFile); - QuickstartTableRequest request = - new QuickstartTableRequest("baseballStats", schemaFile, tableConfigFile, ingestionJobSpecFile, FileFormat.CSV); + QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir); printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****"); @@ -174,10 +171,9 @@ public class Quickstart { e.printStackTrace(); } })); - printStatus(Color.CYAN, "***** Adding baseballStats table *****"); - runner.addTable(); - printStatus(Color.CYAN, "***** Launch data ingestion job to build index segment for baseballStats and push to controller *****"); - runner.launchDataIngestionJob(); + printStatus(Color.CYAN, "***** Bootstrap baseballStats table *****"); + runner.bootstrapTable(); + printStatus(Color.CYAN, "***** Waiting for 5 seconds for the server to fetch the assigned segment *****"); Thread.sleep(5000); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java index 117ccf1..fcaa32f 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java @@ -20,80 +20,87 @@ package org.apache.pinot.tools; import java.io.File; import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.data.readers.FileFormat; public class QuickstartTableRequest { - File schemaFile; - File tableRequestFile; - File ingestionJobFile; - TableType tableType; - String tableName; - FileFormat segmentFileFormat = FileFormat.CSV; + private String _tableName; + private TableType _tableType; + private File _schemaFile; + private File _tableRequestFile; + private File _ingestionJobFile; + private String _bootstrapTableDir; - public QuickstartTableRequest(String tableName, File schemaFile, File tableRequest, File ingestionJobFile, - FileFormat segmentFileFormat) { - this.tableName = tableName; - this.schemaFile = schemaFile; - this.tableRequestFile = tableRequest; - tableType = TableType.OFFLINE; - this.segmentFileFormat = segmentFileFormat; - this.ingestionJobFile = ingestionJobFile; + public QuickstartTableRequest(String bootstrapTableDir) { + this._bootstrapTableDir = bootstrapTableDir; } - public QuickstartTableRequest(String tableName, File schemaFile, File tableRequest) { - this.tableName = tableName; - this.schemaFile = schemaFile; - this.tableRequestFile = tableRequest; - tableType = TableType.REALTIME; - } - - public FileFormat getSegmentFileFormat() { - return segmentFileFormat; + public QuickstartTableRequest(String tableName, File schemaFile, File tableRequest, File ingestionJobFile) { + this._tableName = tableName; + this._schemaFile = schemaFile; + this._tableRequestFile = tableRequest; + _tableType = TableType.OFFLINE; + this._ingestionJobFile = ingestionJobFile; } - public void setSegmentFileFormat(FileFormat segmentFileFormat) { - this.segmentFileFormat = segmentFileFormat; + public QuickstartTableRequest(String tableName, File schemaFile, File tableRequest) { + this._tableName = tableName; + this._schemaFile = schemaFile; + this._tableRequestFile = tableRequest; + _tableType = TableType.REALTIME; } public File getSchemaFile() { - return schemaFile; + return _schemaFile; } public void setSchemaFile(File schemaFile) { - this.schemaFile = schemaFile; + this._schemaFile = schemaFile; } public File getTableRequestFile() { - return tableRequestFile; + return _tableRequestFile; } public void setTableRequestFile(File tableRequestFile) { - this.tableRequestFile = tableRequestFile; + this._tableRequestFile = tableRequestFile; } public File getIngestionJobFile() { - return ingestionJobFile; + return _ingestionJobFile; } public void setIngestionJobFile(File ingestionJobFile) { - this.ingestionJobFile = ingestionJobFile; + this._ingestionJobFile = ingestionJobFile; } public TableType getTableType() { - return tableType; + return _tableType; } public void setTableType(TableType tableType) { - this.tableType = tableType; + this._tableType = tableType; } public String getTableName() { - return tableName; + return _tableName; } public void setTableName(String tableName) { - this.tableName = tableName; + this._tableName = tableName; + } + + public String getBootstrapTableDir() { + return _bootstrapTableDir; + } + + public void setBootstrapTableDir(String bootstrapTableDir) { + this._bootstrapTableDir = bootstrapTableDir; + } + + public String toString() { + return "{ tableName = " + _tableName + ", tableType = " + _tableType + ", schemaFile = " + _schemaFile + + ", tableRequestFile = " + _tableRequestFile + ", ingestionJobFile = " + _ingestionJobFile + + ", bootstrapTableDir = " + _bootstrapTableDir + " }"; } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java index 0af2b7c..2b1f9a0 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java @@ -48,13 +48,13 @@ public class RealtimeQuickStart { public void execute() throws Exception { File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis())); - File configDir = new File(quickstartTmpDir, "configs"); - File dataDir = new File(quickstartTmpDir, "data"); - Preconditions.checkState(configDir.mkdirs()); + + File baseDir = new File(quickstartTmpDir, "meetupRsvp"); + File dataDir = new File(baseDir, "rawdata"); Preconditions.checkState(dataDir.mkdirs()); - File schemaFile = new File(configDir, "meetupRsvp_schema.json"); - File tableConfigFile = new File(configDir, "meetupRsvp_realtime_table_config.json"); + File schemaFile = new File(baseDir, "meetupRsvp_schema.json"); + File tableConfigFile = new File(baseDir, "meetupRsvp_realtime_table_config.json"); ClassLoader classLoader = Quickstart.class.getClassLoader(); URL resource = classLoader.getResource("examples/stream/meetupRsvp/meetupRsvp_schema.json"); @@ -64,7 +64,7 @@ public class RealtimeQuickStart { com.google.common.base.Preconditions.checkNotNull(resource); FileUtils.copyURLToFile(resource, tableConfigFile); - QuickstartTableRequest request = new QuickstartTableRequest("meetupRsvp", schemaFile, tableConfigFile); + QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir); printStatus(Color.CYAN, "***** Starting Kafka *****"); @@ -93,8 +93,8 @@ public class RealtimeQuickStart { e.printStackTrace(); } })); - printStatus(Color.CYAN, "***** Adding meetupRSVP table *****"); - runner.addTable(); + printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****"); + runner.bootstrapTable(); printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****"); Thread.sleep(5000); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java index a457cac..e57174c 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java @@ -48,13 +48,12 @@ public class UpsertQuickStart { public void execute() throws Exception { File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis())); - File configDir = new File(quickstartTmpDir, "configs"); - File dataDir = new File(quickstartTmpDir, "data"); - Preconditions.checkState(configDir.mkdirs()); + File bootstrapTableDir = new File(quickstartTmpDir, "meetupRsvp"); + File dataDir = new File(bootstrapTableDir, "data"); Preconditions.checkState(dataDir.mkdirs()); - File schemaFile = new File(configDir, "upsert_meetupRsvp_schema.json"); - File tableConfigFile = new File(configDir, "upsert_meetupRsvp_realtime_table_config.json"); + File schemaFile = new File(bootstrapTableDir, "meetupRsvp_schema.json"); + File tableConfigFile = new File(bootstrapTableDir, "meetupRsvp_realtime_table_config.json"); ClassLoader classLoader = Quickstart.class.getClassLoader(); URL resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json"); @@ -64,7 +63,7 @@ public class UpsertQuickStart { com.google.common.base.Preconditions.checkNotNull(resource); FileUtils.copyURLToFile(resource, tableConfigFile); - QuickstartTableRequest request = new QuickstartTableRequest("meetupRsvp", schemaFile, tableConfigFile); + QuickstartTableRequest request = new QuickstartTableRequest(bootstrapTableDir.getAbsolutePath()); final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir); printStatus(Color.CYAN, "***** Starting Kafka *****"); @@ -94,8 +93,8 @@ public class UpsertQuickStart { e.printStackTrace(); } })); - printStatus(Color.CYAN, "***** Adding meetupRSVP(upsert) table *****"); - runner.addTable(); + printStatus(Color.CYAN, "***** Bootstrap meetupRSVP(upsert) table *****"); + runner.bootstrapTable(); printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****"); Thread.sleep(5000); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java index cdd620f..83c90da 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java @@ -41,6 +41,7 @@ import org.apache.pinot.tools.admin.command.QuickStartCommand; import org.apache.pinot.tools.admin.command.RealtimeProvisioningHelperCommand; import org.apache.pinot.tools.admin.command.RebalanceTableCommand; import org.apache.pinot.tools.admin.command.SegmentProcessorFrameworkCommand; +import org.apache.pinot.tools.admin.command.BootstrapTableCommand; import org.apache.pinot.tools.admin.command.ShowClusterInfoCommand; import org.apache.pinot.tools.admin.command.StartBrokerCommand; import org.apache.pinot.tools.admin.command.StartControllerCommand; @@ -127,6 +128,7 @@ public class PinotAdministrator { @SubCommand(name = "AnonymizeData", impl = AnonymizeDataCommand.class), @SubCommand(name = "GitHubEventsQuickStart", impl = GitHubEventsQuickStartCommand.class), @SubCommand(name = "StreamGitHubEvents", impl = StreamGitHubEventsCommand.class), + @SubCommand(name = "BootstrapTable", impl = BootstrapTableCommand.class), @SubCommand(name = "SegmentProcessorFramework", impl = SegmentProcessorFrameworkCommand.class) }) Command _subCommand; diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java new file mode 100644 index 0000000..7d725fc --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools.admin.command; + +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.tools.Command; +import org.apache.pinot.tools.BootstrapTableTool; +import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The command to bootstrap a Pinot table from a directory with table schema/config/ingestionJobSpec/raw data files. + * + * Sample usage: + * {@code pinot-admin.sh BootstrapTable -dir <path-to-table-configs-directory> } + * + * The directory structure is based on current example conventions: + * For offline table: + * ``` + * <table_name>/ + * <table_name>/<table_name>_schema.json + * <table_name>/<table_name>_offline_table_config.json + * <table_name>/ingestionJobSpec.yaml + * <table_name>/rawdata/... + * ``` + * + * For realtime table: + * ``` + * <table_name>/ + * <table_name>/<table_name>_schema.json + * <table_name>/<table_name>_realtime_table_config.json + * ``` + * + * For hybrid table: + * ``` + * <table_name>/ + * <table_name>/<table_name>_schema.json + * <table_name>/<table_name>_offline_table_config.json + * <table_name>/<table_name>_realtime_table_config.json + * <table_name>/ingestionJobSpec.yaml + * <table_name>/rawdata/... + * ``` + */ +public class BootstrapTableCommand extends AbstractBaseAdminCommand implements Command { + private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapTableCommand.class.getName()); + + @Option(name = "-controllerHost", required = false, metaVar = "<String>", usage = "host name for controller.") + private String _controllerHost; + + @Option(name = "-controllerPort", required = false, metaVar = "<int>", usage = "http port for broker.") + private final String _controllerPort = DEFAULT_CONTROLLER_PORT; + + @Option(name = "-dir", required = false, aliases = {"-d", "-directory"}, metaVar = "<String>", usage = "The directory contains all the configs and data to bootstrap a table") + private String _dir; + + @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") + private final boolean _help = false; + + @Override + public boolean getHelp() { + return _help; + } + + @Override + public String getName() { + return "BootstrapTable"; + } + + public BootstrapTableCommand setDir(String dir) { + _dir = dir; + return this; + } + + @Override + public String toString() { + return ("BootstrapTable -dir " + _dir); + } + + @Override + public void cleanup() { + + } + + @Override + public String description() { + return "Run Pinot Bootstrap Table."; + } + + @Override + public boolean execute() + throws Exception { + PluginManager.get().init(); + return new BootstrapTableTool(_controllerHost, Integer.parseInt(_controllerPort), _dir).execute(); + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java index fd8aa21..e2e3e38 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.io.Reader; +import java.net.InetAddress; import java.net.URL; import java.util.ArrayList; import java.util.List; @@ -36,6 +37,7 @@ import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.tools.QuickstartTableRequest; +import org.apache.pinot.tools.BootstrapTableTool; import org.apache.pinot.tools.utils.JarUtils; import org.yaml.snakeyaml.Yaml; @@ -169,6 +171,17 @@ public class QuickstartRunner { .setInstances(number).setRole(TenantRole.BROKER).setExecute(true).execute(); } + public void bootstrapTable() + throws Exception { + for (QuickstartTableRequest request : _tableRequests) { + if (!new BootstrapTableTool(InetAddress.getLocalHost().getHostName(), _controllerPorts.get(0), request.getBootstrapTableDir()) + .execute()) { + throw new RuntimeException("Failed to bootstrap table with request - " + request); + } + } + } + + @Deprecated public void addTable() throws Exception { for (QuickstartTableRequest request : _tableRequests) { @@ -178,6 +191,7 @@ public class QuickstartRunner { } } + @Deprecated public void launchDataIngestionJob() throws Exception { for (QuickstartTableRequest request : _tableRequests) { @@ -205,8 +219,7 @@ public class QuickstartRunner { public JsonNode runQuery(String query) throws Exception { int brokerPort = _brokerPorts.get(RANDOM.nextInt(_brokerPorts.size())); - return JsonUtils - .stringToJsonNode(new PostQueryCommand().setBrokerPort(String.valueOf(brokerPort)).setQueryType( - CommonConstants.Broker.Request.SQL).setQuery(query).run()); + return JsonUtils.stringToJsonNode(new PostQueryCommand().setBrokerPort(String.valueOf(brokerPort)) + .setQueryType(CommonConstants.Broker.Request.SQL).setQuery(query).run()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org