This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch fixing_hadoop_copy_plugin_dir
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 4e21e1fb072cec417e01473dc1e8e2b3a6f86b08
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Tue Jan 26 13:45:33 2021 -0800

    Fixing hadoop plugins jar copy issue
---
 .../batch/hadoop/HadoopSegmentGenerationJobRunner.java       | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
index 59beadc..7efa405 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.ingestion.batch.hadoop;
 
+import static 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils.PINOT_PLUGINS_DIR;
 import static 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ;
 import static 
org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY_NAME;
 
@@ -167,6 +168,8 @@ public class HadoopSegmentGenerationJobRunner extends 
Configured implements Inge
     outputDirFS.mkdir(stagingInputDir.toUri());
     Path stagingSegmentTarUri = new Path(stagingDirURI.toString(), 
SEGMENT_TAR_DIR);
     outputDirFS.mkdir(stagingSegmentTarUri.toUri());
+    Path stagingPluginDir = new Path(stagingDirURI.toString(), 
PINOT_PLUGINS_DIR);
+    outputDirFS.mkdir(stagingPluginDir.toUri());
 
     //Get list of files to process
     String[] files = inputDirFS.listFiles(inputDirURI, true);
@@ -240,7 +243,7 @@ public class HadoopSegmentGenerationJobRunner extends 
Configured implements Inge
       // In order to ensure pinot plugins would be loaded to each worker, this 
method
       // tars entire plugins directory and set this file into Distributed 
cache.
       // Then each mapper job will untar the plugin tarball, and set system 
properties accordingly.
-      packPluginsToDistributedCache(job);
+      packPluginsToDistributedCache(job, outputDirFS, stagingPluginDir);
 
       // Add dependency jars
       if 
(_spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR)) 
{
@@ -285,7 +288,8 @@ public class HadoopSegmentGenerationJobRunner extends 
Configured implements Inge
     return HadoopSegmentCreationMapper.class;
   }
 
-  protected void packPluginsToDistributedCache(Job job) {
+  protected void packPluginsToDistributedCache(Job job, PinotFS outputDirFS, 
Path stagingPluginDir)
+      throws Exception {
     File pluginsRootDir = new File(PluginManager.get().getPluginsRootDir());
     if (pluginsRootDir.exists()) {
       File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
@@ -295,7 +299,9 @@ public class HadoopSegmentGenerationJobRunner extends 
Configured implements Inge
         LOGGER.error("Failed to tar plugins directory", e);
         throw new RuntimeException(e);
       }
-      job.addCacheArchive(pluginsTarGzFile.toURI());
+      final URI stagingPluginDirURI = stagingPluginDir.toUri();
+      outputDirFS.copyFromLocalFile(pluginsTarGzFile, stagingPluginDirURI);
+      job.addCacheArchive(stagingPluginDirURI);
 
       String pluginsIncludes = 
System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME);
       if (pluginsIncludes != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to