This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ae8781959c [Feature] Add an option to search input files recursively 
in ingestion job. The default is set to true to be backward compatible. (#9265)
ae8781959c is described below

commit ae8781959ce0837f89767aec08783ac71c27f25c
Author: Yao Liu <y...@startree.ai>
AuthorDate: Wed Aug 24 14:54:41 2022 -0700

    [Feature] Add an option to search input files recursively in ingestion job. 
The default is set to true to be backward compatible. (#9265)
---
 .../segment/generation/SegmentGenerationUtils.java |  60 +++++++++++
 .../generation/SegmentGenerationUtilsTest.java     | 118 +++++++++++++++++++++
 .../hadoop/HadoopSegmentGenerationJobRunner.java   |  85 +++++----------
 .../spark/SparkSegmentGenerationJobRunner.java     |  49 +--------
 .../spark/SparkSegmentGenerationJobRunnerTest.java |   1 +
 .../spark3/SparkSegmentGenerationJobRunner.java    |  49 +--------
 .../SparkSegmentGenerationJobRunnerTest.java       |   1 +
 .../standalone/SegmentGenerationJobRunner.java     |  35 +-----
 .../standalone/SegmentGenerationJobRunnerTest.java |   1 +
 .../batch/spec/SegmentGenerationJobSpec.java       |  14 +++
 .../ingestion/batch/IngestionJobLauncherTest.java  |   4 +
 .../resources/ingestion_job_spec_template.yaml     |   1 +
 12 files changed, 234 insertions(+), 184 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java
index 121f2da164..d25372bf37 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java
@@ -28,6 +28,12 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLConnection;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -229,4 +235,58 @@ public class SegmentGenerationUtils {
     }
     return IOUtils.toString(connection.getInputStream(), 
StandardCharsets.UTF_8);
   }
+
+
+  /**
+   * @param pinotFs root directory fs
+   * @param fileUri root directory uri
+   * @param includePattern optional glob patterns for files to include
+   * @param excludePattern optional glob patterns for files to exclude
+   * @param searchRecursively if ture, search files recursively from directory 
specified in fileUri
+   * @return list of matching files.
+   * @throws IOException on IO failure for list files in root directory.
+   * @throws URISyntaxException for matching file URIs
+   * @throws RuntimeException if there is no matching file.
+   */
+  public static List<String> listMatchedFilesWithRecursiveOption(PinotFS 
pinotFs, URI fileUri,
+      @Nullable String includePattern, @Nullable String excludePattern, 
boolean searchRecursively)
+      throws Exception {
+    String[] files;
+    // listFiles throws IOException
+    files = pinotFs.listFiles(fileUri, searchRecursively);
+    //TODO: sort input files based on creation time
+    PathMatcher includeFilePathMatcher = null;
+    if (includePattern != null) {
+      includeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(includePattern);
+    }
+    PathMatcher excludeFilePathMatcher = null;
+    if (excludePattern != null) {
+      excludeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(excludePattern);
+    }
+    List<String> filteredFiles = new ArrayList<>();
+    for (String file : files) {
+      if (includeFilePathMatcher != null) {
+        if (!includeFilePathMatcher.matches(Paths.get(file))) {
+          continue;
+        }
+      }
+      if (excludeFilePathMatcher != null) {
+        if (excludeFilePathMatcher.matches(Paths.get(file))) {
+          continue;
+        }
+      }
+      if (!pinotFs.isDirectory(new URI(file))) {
+        // In case PinotFS implementations list files without a scheme (e.g. 
hdfs://), then we may lose it in the
+        // input file path. Call SegmentGenerationUtils.getFileURI() to fix 
this up.
+        // getFileURI throws URISyntaxException
+        filteredFiles.add(SegmentGenerationUtils.getFileURI(file, 
fileUri).toString());
+      }
+    }
+    if (filteredFiles.isEmpty()) {
+      throw new RuntimeException(String.format(
+          "No file found in the input directory: %s matching 
includeFileNamePattern: %s,"
+              + " excludeFileNamePattern: %s", fileUri, includePattern, 
excludePattern));
+    }
+    return filteredFiles;
+  }
 }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtilsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtilsTest.java
index 525c136ca2..cf2cbaf749 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtilsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtilsTest.java
@@ -19,10 +19,18 @@
 
 package org.apache.pinot.common.segment.generation;
 
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URLEncoder;
+import java.nio.file.Files;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -124,4 +132,114 @@ public class SegmentGenerationUtilsTest {
       throws URISyntaxException {
     validateFileURI(directoryURI, directoryURI.toString());
   }
+
+  // Test that we search files recursively when recursiveSearch option is set 
to true.
+  @Test
+  public void testMatchFilesRecursiveSearchOnRecursiveInputFilePattern()
+      throws Exception {
+    File testDir = makeTestDir();
+    File inputDir = new File(testDir, "input");
+    File inputSubDir1 = new File(inputDir, "2009");
+    inputSubDir1.mkdirs();
+
+    File inputFile1 = new File(inputDir, "input.csv");
+    FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2", 
"value1,1", "value2,2"));
+
+    File inputFile2 = new File(inputSubDir1, "input.csv");
+    FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2", 
"value3,3", "value4,4"));
+    URI inputDirURI = new URI(inputDir.getAbsolutePath());
+    if (inputDirURI.getScheme() == null) {
+      inputDirURI = new File(inputDir.getAbsolutePath()).toURI();
+    }
+    PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
+    String includePattern = "glob:" + inputDir.getAbsolutePath() + "/**.csv";
+    List<String> files =
+        SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, 
inputDirURI, includePattern, null, true);
+    Assert.assertEquals(files.size(), 2);
+  }
+
+  // Test that we don't search files recursively when recursiveSearch option 
is set to false.
+  @Test
+  public void testMatchFilesRecursiveSearchOnNonRecursiveInputFilePattern()
+      throws Exception {
+    File testDir = makeTestDir();
+    File inputDir = new File(testDir, "dir");
+    File inputSubDir1 = new File(inputDir, "2009");
+    inputSubDir1.mkdirs();
+
+    File inputFile1 = new File(inputDir, "input.csv");
+    FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2", 
"value1,1", "value2,2"));
+
+    File inputFile2 = new File(inputSubDir1, "input.csv");
+    FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2", 
"value3,3", "value4,4"));
+    URI inputDirURI = new URI(inputDir.getAbsolutePath());
+    if (inputDirURI.getScheme() == null) {
+      inputDirURI = new File(inputDir.getAbsolutePath()).toURI();
+    }
+    PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
+    String includePattern = "glob:" + inputDir.getAbsolutePath() + "/*.csv";
+
+    List<String> files =
+        SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, 
inputDirURI, includePattern, null,
+            false);
+    Assert.assertEquals(files.size(), 1);
+  }
+
+  // Test that we exclude files that match exclude pattern.
+  @Test
+  public void testMatchFilesRecursiveSearchExcludeFilePattern()
+      throws Exception {
+    File testDir = makeTestDir();
+    File inputDir = new File(testDir, "dir");
+    File inputSubDir1 = new File(inputDir, "2009");
+    inputSubDir1.mkdirs();
+
+    File inputFile1 = new File(inputDir, "input1.csv");
+    FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2", 
"value1,1", "value2,2"));
+
+    File inputFile2 = new File(inputSubDir1, "input2.csv");
+    FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2", 
"value3,3", "value4,4"));
+    URI inputDirURI = new URI(inputDir.getAbsolutePath());
+    if (inputDirURI.getScheme() == null) {
+      inputDirURI = new File(inputDir.getAbsolutePath()).toURI();
+    }
+    PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
+    String includePattern = "glob:" + inputDir.getAbsolutePath() + "/**.csv";
+    String excludePattern = "glob:" + inputDir.getAbsolutePath() + 
"/2009/input2.csv";
+
+    List<String> files =
+        SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, 
inputDirURI, includePattern,
+            excludePattern, true);
+    Assert.assertEquals(files.size(), 1);
+  }
+
+  // Test that we throw an exception when there is no file matching.
+  @Test
+  public void testEmptyMatchFiles()
+      throws Exception {
+    File testDir = makeTestDir();
+    File inputDir = new File(testDir, "dir");
+    File inputSubDir1 = new File(inputDir, "2009");
+    inputSubDir1.mkdirs();
+
+    File inputFile1 = new File(inputDir, "input1.csv");
+    FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2", 
"value1,1", "value2,2"));
+
+    File inputFile2 = new File(inputSubDir1, "input2.csv");
+    FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2", 
"value3,3", "value4,4"));
+    URI inputDirURI = new File(inputDir.getAbsolutePath()).toURI();
+    PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
+    String includePattern = "glob:" + inputDir.getAbsolutePath() + "/**.json";
+    Assert.assertThrows(RuntimeException.class, () -> {
+      SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, 
inputDirURI, includePattern, null, true);
+    });
+  }
+
+  private File makeTestDir()
+      throws IOException {
+    File testDir = 
Files.createTempDirectory("testSegmentGeneration-").toFile();
+    testDir.delete();
+    testDir.mkdirs();
+    return testDir;
+  }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
index 0608fd7192..0ea86376ec 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
@@ -26,8 +26,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.nio.file.FileSystems;
-import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -142,13 +140,14 @@ public class HadoopSegmentGenerationJobRunner extends 
Configured implements Inge
     for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
       PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
     }
-
-    //Get pinotFS for input
+    //Get list of files to process
     URI inputDirURI = new URI(_spec.getInputDirURI());
     if (inputDirURI.getScheme() == null) {
       inputDirURI = new File(_spec.getInputDirURI()).toURI();
     }
     PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
+    List<String> filteredFiles = 
SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, 
inputDirURI,
+        _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), 
_spec.isSearchRecursively());
 
     //Get outputFS for writing output pinot segments
     URI outputDirURI = new URI(_spec.getOutputDirURI());
@@ -181,71 +180,35 @@ public class HadoopSegmentGenerationJobRunner extends 
Configured implements Inge
     Path stagingSegmentTarUri = new Path(stagingDirURI.toString(), 
SEGMENT_TAR_SUBDIR_NAME);
     outputDirFS.mkdir(stagingSegmentTarUri.toUri());
 
-    //Get list of files to process
-    String[] files = inputDirFS.listFiles(inputDirURI, true);
-
-    //TODO: sort input files based on creation time
-    List<String> filteredFiles = new ArrayList<>();
-    PathMatcher includeFilePathMatcher = null;
-    if (_spec.getIncludeFileNamePattern() != null) {
-      includeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(_spec.getIncludeFileNamePattern());
-    }
-    PathMatcher excludeFilePathMatcher = null;
-    if (_spec.getExcludeFileNamePattern() != null) {
-      excludeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(_spec.getExcludeFileNamePattern());
-    }
+    // numDataFiles is guaranteed to be greater than zero since 
listMatchedFilesWithRecursiveOption will throw
+    // runtime exception if the matched files list is empty.
+    int numDataFiles = filteredFiles.size();
 
-    for (String file : files) {
-      if (includeFilePathMatcher != null) {
-        if (!includeFilePathMatcher.matches(Paths.get(file))) {
-          continue;
+    LOGGER.info("Creating segments with data files: {}", filteredFiles);
+    if 
(!SegmentGenerationJobUtils.useGlobalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec()))
 {
+      Map<String, List<String>> localDirIndex = new HashMap<>();
+      for (String filteredFile : filteredFiles) {
+        java.nio.file.Path filteredParentPath = 
Paths.get(filteredFile).getParent();
+        if (!localDirIndex.containsKey(filteredParentPath.toString())) {
+          localDirIndex.put(filteredParentPath.toString(), new ArrayList<>());
         }
+        localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
       }
-      if (excludeFilePathMatcher != null) {
-        if (excludeFilePathMatcher.matches(Paths.get(file))) {
-          continue;
+      for (String parentPath : localDirIndex.keySet()) {
+        List<String> siblingFiles = localDirIndex.get(parentPath);
+        Collections.sort(siblingFiles);
+        for (int i = 0; i < siblingFiles.size(); i++) {
+          URI inputFileURI = 
SegmentGenerationUtils.getFileURI(siblingFiles.get(i),
+              SegmentGenerationUtils.getDirectoryURI(parentPath));
+          createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, 
stagingInputDir, i);
         }
       }
-      if (!inputDirFS.isDirectory(new URI(file))) {
-        filteredFiles.add(file);
-      }
-    }
-
-    int numDataFiles = filteredFiles.size();
-    if (numDataFiles == 0) {
-      String errorMessage = String
-          .format("No data file founded in [%s], with include file pattern: 
[%s] and exclude file  pattern [%s]",
-              _spec.getInputDirURI(), _spec.getIncludeFileNamePattern(), 
_spec.getExcludeFileNamePattern());
-      LOGGER.error(errorMessage);
-      throw new RuntimeException(errorMessage);
     } else {
-      LOGGER.info("Creating segments with data files: {}", filteredFiles);
-      if 
(!SegmentGenerationJobUtils.useGlobalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec()))
 {
-        Map<String, List<String>> localDirIndex = new HashMap<>();
-        for (String filteredFile : filteredFiles) {
-          java.nio.file.Path filteredParentPath = 
Paths.get(filteredFile).getParent();
-          if (!localDirIndex.containsKey(filteredParentPath.toString())) {
-            localDirIndex.put(filteredParentPath.toString(), new 
ArrayList<>());
-          }
-          localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
-        }
-        for (String parentPath : localDirIndex.keySet()) {
-          List<String> siblingFiles = localDirIndex.get(parentPath);
-          Collections.sort(siblingFiles);
-          for (int i = 0; i < siblingFiles.size(); i++) {
-            URI inputFileURI = SegmentGenerationUtils
-                .getFileURI(siblingFiles.get(i), 
SegmentGenerationUtils.getDirectoryURI(parentPath));
-            createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, 
stagingInputDir, i);
-          }
-        }
-      } else {
-        for (int i = 0; i < numDataFiles; i++) {
-          URI inputFileURI = 
SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
-          createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, 
stagingInputDir, i);
-        }
+      for (int i = 0; i < numDataFiles; i++) {
+        URI inputFileURI = 
SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
+        createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, 
stagingInputDir, i);
       }
     }
-
     try {
       // Set up the job
       Job job = Job.getInstance(getConf());
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index ed73a2b719..ceaf2b1b9c 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -23,9 +23,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URLEncoder;
-import java.nio.file.FileSystems;
 import java.nio.file.Path;
-import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -128,14 +126,15 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
     for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
       PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
     }
-
-    //Get pinotFS for input
+    //Get list of files to process
     URI inputDirURI = new URI(_spec.getInputDirURI());
     if (inputDirURI.getScheme() == null) {
       inputDirURI = new File(_spec.getInputDirURI()).toURI();
     }
     PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
-
+    List<String> filteredFiles = 
SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, 
inputDirURI,
+        _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), 
_spec.isSearchRecursively());
+    LOGGER.info("Found {} files to create Pinot segments!", 
filteredFiles.size());
     //Get outputFS for writing output pinot segments
     URI outputDirURI = new URI(_spec.getOutputDirURI());
     if (outputDirURI.getScheme() == null) {
@@ -159,46 +158,6 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
       }
       outputDirFS.mkdir(stagingDirURI);
     }
-    //Get list of files to process
-    String[] files = inputDirFS.listFiles(inputDirURI, true);
-
-    //TODO: sort input files based on creation time
-    List<String> filteredFiles = new ArrayList<>();
-    PathMatcher includeFilePathMatcher = null;
-    if (_spec.getIncludeFileNamePattern() != null) {
-      includeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(_spec.getIncludeFileNamePattern());
-    }
-    PathMatcher excludeFilePathMatcher = null;
-    if (_spec.getExcludeFileNamePattern() != null) {
-      excludeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(_spec.getExcludeFileNamePattern());
-    }
-
-    for (String file : files) {
-      if (includeFilePathMatcher != null) {
-        if (!includeFilePathMatcher.matches(Paths.get(file))) {
-          continue;
-        }
-      }
-      if (excludeFilePathMatcher != null) {
-        if (excludeFilePathMatcher.matches(Paths.get(file))) {
-          continue;
-        }
-      }
-      if (!inputDirFS.isDirectory(new URI(file))) {
-        // In case PinotFS implementations list files without a scheme (e.g. 
hdfs://), then we may lose it in the
-        // input file path. Call SegmentGenerationUtils.getFileURI() to fix 
this up.
-        filteredFiles.add(SegmentGenerationUtils.getFileURI(file, 
inputDirURI).toString());
-      }
-    }
-
-    if (filteredFiles.isEmpty()) {
-      throw new RuntimeException(
-          String.format("No file found in the input directory: %s matching 
includeFileNamePattern: %s,"
-                  + " excludeFileNamePattern: %s", _spec.getInputDirURI(), 
_spec.getIncludeFileNamePattern(),
-              _spec.getExcludeFileNamePattern()));
-    }
-
-    LOGGER.info("Found {} files to create Pinot segments!", 
filteredFiles.size());
     try {
       JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunnerTest.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunnerTest.java
index 3a5de7b0e1..4556f9fc65 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunnerTest.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunnerTest.java
@@ -192,6 +192,7 @@ public class SparkSegmentGenerationJobRunnerTest {
     SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
     jobSpec.setJobType("SegmentCreation");
     jobSpec.setInputDirURI(inputDir.toURI().toString());
+    jobSpec.setSearchRecursively(true);
     jobSpec.setOutputDirURI(outputDir.toURI().toString());
     jobSpec.setOverwriteOutput(true);
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
index 6300040d4e..46db5818a2 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
@@ -23,9 +23,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URLEncoder;
-import java.nio.file.FileSystems;
 import java.nio.file.Path;
-import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -128,14 +126,15 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
     for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
       PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
     }
-
-    //Get pinotFS for input
+    //Get list of files to process
     URI inputDirURI = new URI(_spec.getInputDirURI());
     if (inputDirURI.getScheme() == null) {
       inputDirURI = new File(_spec.getInputDirURI()).toURI();
     }
     PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
-
+    List<String> filteredFiles = 
SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, 
inputDirURI,
+        _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), 
_spec.isSearchRecursively());
+    LOGGER.info("Found {} files to create Pinot segments!", 
filteredFiles.size());
     //Get outputFS for writing output pinot segments
     URI outputDirURI = new URI(_spec.getOutputDirURI());
     if (outputDirURI.getScheme() == null) {
@@ -159,46 +158,6 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
       }
       outputDirFS.mkdir(stagingDirURI);
     }
-    //Get list of files to process
-    String[] files = inputDirFS.listFiles(inputDirURI, true);
-
-    //TODO: sort input files based on creation time
-    List<String> filteredFiles = new ArrayList<>();
-    PathMatcher includeFilePathMatcher = null;
-    if (_spec.getIncludeFileNamePattern() != null) {
-      includeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(_spec.getIncludeFileNamePattern());
-    }
-    PathMatcher excludeFilePathMatcher = null;
-    if (_spec.getExcludeFileNamePattern() != null) {
-      excludeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(_spec.getExcludeFileNamePattern());
-    }
-
-    for (String file : files) {
-      if (includeFilePathMatcher != null) {
-        if (!includeFilePathMatcher.matches(Paths.get(file))) {
-          continue;
-        }
-      }
-      if (excludeFilePathMatcher != null) {
-        if (excludeFilePathMatcher.matches(Paths.get(file))) {
-          continue;
-        }
-      }
-      if (!inputDirFS.isDirectory(new URI(file))) {
-        // In case PinotFS implementations list files without a scheme (e.g. 
hdfs://), then we may lose it in the
-        // input file path. Call SegmentGenerationUtils.getFileURI() to fix 
this up.
-        filteredFiles.add(SegmentGenerationUtils.getFileURI(file, 
inputDirURI).toString());
-      }
-    }
-
-    if (filteredFiles.isEmpty()) {
-      throw new RuntimeException(
-          String.format("No file found in the input directory: %s matching 
includeFileNamePattern: %s,"
-                  + " excludeFileNamePattern: %s", _spec.getInputDirURI(), 
_spec.getIncludeFileNamePattern(),
-              _spec.getExcludeFileNamePattern()));
-    }
-
-    LOGGER.info("Found {} files to create Pinot segments!", 
filteredFiles.size());
     try {
       JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunnerTest.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunnerTest.java
index 96c11e893b..52ea6d2d9d 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunnerTest.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunnerTest.java
@@ -192,6 +192,7 @@ public class SparkSegmentGenerationJobRunnerTest {
     SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
     jobSpec.setJobType("SegmentCreation");
     jobSpec.setInputDirURI(inputDir.toURI().toString());
+    jobSpec.setSearchRecursively(true);
     jobSpec.setOutputDirURI(outputDir.toURI().toString());
     jobSpec.setOverwriteOutput(true);
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index 51e65f45b6..a3d721ed53 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -23,8 +23,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URLEncoder;
-import java.nio.file.FileSystems;
-import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -172,37 +170,8 @@ public class SegmentGenerationJobRunner implements 
IngestionJobRunner {
   public void run()
       throws Exception {
     // Get list of files to process.
-    String[] files = _inputDirFS.listFiles(_inputDirURI, true);
-
-    // TODO - sort input files by modification timestamp. Though this is 
problematic because:
-    // a. It can put more load on the external filesystem (e.g. S3), and
-    // b. The call to Collections.sort(siblingFiles) below will reorder files 
by name.
-
-    List<String> filteredFiles = new ArrayList<>();
-    PathMatcher includeFilePathMatcher = null;
-    if (_spec.getIncludeFileNamePattern() != null) {
-      includeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(_spec.getIncludeFileNamePattern());
-    }
-    PathMatcher excludeFilePathMatcher = null;
-    if (_spec.getExcludeFileNamePattern() != null) {
-      excludeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(_spec.getExcludeFileNamePattern());
-    }
-
-    for (String file : files) {
-      if (includeFilePathMatcher != null) {
-        if (!includeFilePathMatcher.matches(Paths.get(file))) {
-          continue;
-        }
-      }
-      if (excludeFilePathMatcher != null) {
-        if (excludeFilePathMatcher.matches(Paths.get(file))) {
-          continue;
-        }
-      }
-      if (!_inputDirFS.isDirectory(new URI(file))) {
-        filteredFiles.add(file);
-      }
-    }
+    List<String> filteredFiles = 
SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(_inputDirFS, 
_inputDirURI,
+        _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), 
_spec.isSearchRecursively());
 
     File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + 
UUID.randomUUID());
     try {
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
index cd506fba2c..cd1ec2256b 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
@@ -125,6 +125,7 @@ public class SegmentGenerationJobRunnerTest {
     File schemaFile = makeSchemaFile(testDir, schemaName);
     File tableConfigFile = makeTableConfigFile(testDir, schemaName);
     SegmentGenerationJobSpec jobSpec = makeJobSpec(inputDir, outputDir, 
schemaFile, tableConfigFile);
+    jobSpec.setSearchRecursively(true);
     SegmentGenerationJobRunner jobRunner = new 
SegmentGenerationJobRunner(jobSpec);
     jobRunner.run();
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
index 145f443a4a..02e52b26c1 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
@@ -45,6 +45,12 @@ public class SegmentGenerationJobSpec implements 
Serializable {
    */
   private String _inputDirURI;
 
+  /**
+   * If true, search input files recursively from root directly specified in 
_inputDirURI.
+   */
+  // TODO: set the default value to false after all clients are aware of this.
+  private boolean _searchRecursively = true;
+
   /**
    * include file name pattern, supported glob pattern.
    * Sample usage:
@@ -161,6 +167,14 @@ public class SegmentGenerationJobSpec implements 
Serializable {
     _inputDirURI = inputDirURI;
   }
 
+  public boolean isSearchRecursively() {
+    return _searchRecursively;
+  }
+
+  public void setSearchRecursively(boolean searchRecursively) {
+    _searchRecursively = searchRecursively;
+  }
+
   public String getIncludeFileNamePattern() {
     return _includeFileNamePattern;
   }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
index cc1fafad79..94270e20d9 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
@@ -48,6 +48,8 @@ public class IngestionJobLauncherTest {
         null, context, _defaultEnvironmentValues);
     Assert.assertEquals(spec.getInputDirURI(), 
"file:///path/to/input/2020/05/06");
     Assert.assertEquals(spec.getOutputDirURI(), 
"file:///path/to/output/2020/05/06");
+    // searchRecursively is set to false in the yaml file.
+    Assert.assertFalse(spec.isSearchRecursively());
   }
 
   @Test
@@ -56,6 +58,8 @@ public class IngestionJobLauncherTest {
         
GroovyTemplateUtils.class.getClassLoader().getResource("ingestion_job_spec_unicode.yaml").getFile(),
 null,
         null, null);
     Assert.assertEquals("\ufff0", 
spec.getRecordReaderSpec().getConfigs().get("multiValueDelimiter"));
+    // searchRecursively is set to true by default.
+    Assert.assertTrue(spec.isSearchRecursively());
   }
 
   @Test
diff --git a/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml 
b/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml
index e68bec2e38..665b0be218 100644
--- a/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml
+++ b/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml
@@ -26,6 +26,7 @@ executionFrameworkSpec:
 jobType: SegmentCreationAndTarPush
 
 inputDirURI: 'file:///path/to/input/${ year }/${ month }/${ day }'
+searchRecursively: false
 # includeFileNamePattern: include file name pattern, supported glob pattern.
 # Sample usage:
 #   'glob:*.avro' will include all avro files just under the inputDirURI, not 
sub directories;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to