This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch fixing_hadoop_copy_plugin_dir in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 4e21e1fb072cec417e01473dc1e8e2b3a6f86b08 Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Tue Jan 26 13:45:33 2021 -0800 Fixing hadoop plugins jar copy issue --- .../batch/hadoop/HadoopSegmentGenerationJobRunner.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java index 59beadc..7efa405 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.plugin.ingestion.batch.hadoop; +import static org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils.PINOT_PLUGINS_DIR; import static org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ; import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY_NAME; @@ -167,6 +168,8 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge outputDirFS.mkdir(stagingInputDir.toUri()); Path stagingSegmentTarUri = new Path(stagingDirURI.toString(), SEGMENT_TAR_DIR); outputDirFS.mkdir(stagingSegmentTarUri.toUri()); + Path stagingPluginDir = new Path(stagingDirURI.toString(), PINOT_PLUGINS_DIR); + outputDirFS.mkdir(stagingPluginDir.toUri()); //Get list of files to process String[] files = inputDirFS.listFiles(inputDirURI, true); @@ -240,7 +243,7 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge // In order to ensure pinot plugins would be loaded to each worker, this method // tars entire plugins directory and set this file into Distributed cache. // Then each mapper job will untar the plugin tarball, and set system properties accordingly. - packPluginsToDistributedCache(job); + packPluginsToDistributedCache(job, outputDirFS, stagingPluginDir); // Add dependency jars if (_spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR)) { @@ -285,7 +288,8 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge return HadoopSegmentCreationMapper.class; } - protected void packPluginsToDistributedCache(Job job) { + protected void packPluginsToDistributedCache(Job job, PinotFS outputDirFS, Path stagingPluginDir) + throws Exception { File pluginsRootDir = new File(PluginManager.get().getPluginsRootDir()); if (pluginsRootDir.exists()) { File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ); @@ -295,7 +299,9 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge LOGGER.error("Failed to tar plugins directory", e); throw new RuntimeException(e); } - job.addCacheArchive(pluginsTarGzFile.toURI()); + final URI stagingPluginDirURI = stagingPluginDir.toUri(); + outputDirFS.copyFromLocalFile(pluginsTarGzFile, stagingPluginDirURI); + job.addCacheArchive(stagingPluginDirURI); String pluginsIncludes = System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME); if (pluginsIncludes != null) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org