fx19880617 commented on a change in pull request #5741: URL: https://github.com/apache/incubator-pinot/pull/5741#discussion_r459671619
########## File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java ########## @@ -209,93 +210,99 @@ public void run() .get(PLUGINS_INCLUDE_PROPERTY_NAME) : null; final URI finalInputDirURI = inputDirURI; final URI finalOutputDirURI = (stagingDirURI == null) ? outputDirURI : stagingDirURI; - pathRDD.foreach(pathAndIdx -> { - for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) { - PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec)); - } - PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme()); - String[] splits = pathAndIdx.split(" "); - String path = splits[0]; - int idx = Integer.valueOf(splits[1]); - // Load Pinot Plugins copied from Distributed cache. - File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ); - if (localPluginsTarFile.exists()) { - File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx); - try { - TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile); - } catch (Exception e) { - LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e); - throw new RuntimeException(e); + // Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function instead. + pathRDD.foreach(new VoidFunction<String>() { + @Override + public void call(String pathAndIdx) + throws Exception { + PluginManager.get().init(); + for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) { + PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec)); } - LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME, - pluginsDirFile.getAbsolutePath()); - System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath()); - if (pluginsInclude != null) { - LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude); - System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude); + PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme()); + String[] splits = pathAndIdx.split(" "); + String path = splits[0]; + int idx = Integer.valueOf(splits[1]); + // Load Pinot Plugins copied from Distributed cache. + File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ); + if (localPluginsTarFile.exists()) { + File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx); + try { + TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile); + } catch (Exception e) { + LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e); + throw new RuntimeException(e); + } + LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME, + pluginsDirFile.getAbsolutePath()); + System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath()); + if (pluginsInclude != null) { + LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude); + System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude); + } + LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]", + System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME)); + } else { + LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath()); + } + URI inputFileURI = URI.create(path); + if (inputFileURI.getScheme() == null) { + inputFileURI = + new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment()); } - LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]", - System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME)); - } else { - LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath()); - } - URI inputFileURI = URI.create(path); - if (inputFileURI.getScheme() == null) { - inputFileURI = - new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment()); - } - //create localTempDir for input and output - File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID()); - File localInputTempDir = new File(localTempDir, "input"); - FileUtils.forceMkdir(localInputTempDir); - File localOutputTempDir = new File(localTempDir, "output"); - FileUtils.forceMkdir(localOutputTempDir); + //create localTempDir for input and output Review comment: updated! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org